864 lines
36 KiB
Python
864 lines
36 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
核心文檔處理邏輯 - 移植自最佳版本
|
||
包含完整的 DOCX 文字提取和翻譯插入功能
|
||
|
||
Author: PANJIT IT Team
|
||
Created: 2024-09-02
|
||
Modified: 2024-09-02
|
||
"""
|
||
|
||
import re
|
||
import sys
|
||
import time
|
||
from pathlib import Path
|
||
from typing import List, Dict, Tuple, Optional, Any
|
||
from docx.text.paragraph import Paragraph
|
||
from docx.table import Table, _Cell
|
||
from docx.shared import Pt
|
||
from docx.oxml import OxmlElement
|
||
from docx.oxml.ns import qn, nsdecls
|
||
import docx
|
||
|
||
from app.utils.logger import get_logger
|
||
from app.utils.exceptions import FileProcessingError
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
# ---------- Constants ----------
|
||
INSERT_FONT_SIZE_PT = 10
|
||
SENTENCE_MODE = True
|
||
|
||
# ---------- Optional dependencies detection ----------
|
||
try:
|
||
import blingfire
|
||
_HAS_BLINGFIRE = True
|
||
except ImportError:
|
||
_HAS_BLINGFIRE = False
|
||
|
||
try:
|
||
import pysbd
|
||
_HAS_PYSBD = True
|
||
except ImportError:
|
||
_HAS_PYSBD = False
|
||
|
||
# ---------- Helper functions ----------
|
||
def _has_cjk(text: str) -> bool:
|
||
"""Check if text contains CJK (Chinese/Japanese/Korean) characters."""
|
||
for char in text:
|
||
if '\u4e00' <= char <= '\u9fff' or \
|
||
'\u3400' <= char <= '\u4dbf' or \
|
||
'\u20000' <= char <= '\u2a6df' or \
|
||
'\u3040' <= char <= '\u309f' or \
|
||
'\u30a0' <= char <= '\u30ff' or \
|
||
'\uac00' <= char <= '\ud7af':
|
||
return True
|
||
return False
|
||
|
||
def _normalize_text(text: str) -> str:
|
||
"""Normalize text for comparison."""
|
||
return re.sub(r'\s+', ' ', text.strip().lower())
|
||
|
||
def _append_after(p: Paragraph, text_block: str, italic: bool=True, font_size_pt: int=INSERT_FONT_SIZE_PT) -> Paragraph:
|
||
"""Insert a new paragraph after p, return the new paragraph (for chain insert)."""
|
||
new_p = OxmlElement("w:p")
|
||
p._p.addnext(new_p)
|
||
np = Paragraph(new_p, p._parent)
|
||
lines = text_block.split("\n")
|
||
for i, line in enumerate(lines):
|
||
run = np.add_run(line)
|
||
if italic:
|
||
run.italic = True
|
||
if font_size_pt:
|
||
run.font.size = Pt(font_size_pt)
|
||
if i < len(lines) - 1:
|
||
run.add_break()
|
||
tag = np.add_run("\u200b")
|
||
if italic:
|
||
tag.italic = True
|
||
if font_size_pt:
|
||
tag.font.size = Pt(font_size_pt)
|
||
return np
|
||
|
||
def _is_our_insert_block(p: Paragraph) -> bool:
|
||
"""Return True iff paragraph contains our zero-width marker."""
|
||
return any("\u200b" in (r.text or "") for r in p.runs)
|
||
|
||
def _find_last_inserted_after(p: Paragraph, limit: int = 8) -> Optional[Paragraph]:
|
||
"""Find the last paragraph that was inserted after p (up to limit paragraphs)."""
|
||
try:
|
||
# Get all paragraphs in the parent container
|
||
if hasattr(p._parent, 'paragraphs'):
|
||
all_paras = list(p._parent.paragraphs)
|
||
else:
|
||
# Handle cases where _parent doesn't have paragraphs (e.g., table cells)
|
||
return None
|
||
|
||
# Find p's index
|
||
p_index = -1
|
||
for i, para in enumerate(all_paras):
|
||
if para._element == p._element:
|
||
p_index = i
|
||
break
|
||
|
||
if p_index == -1:
|
||
return None
|
||
|
||
# Check paragraphs after p
|
||
last_found = None
|
||
for i in range(p_index + 1, min(p_index + 1 + limit, len(all_paras))):
|
||
if _is_our_insert_block(all_paras[i]):
|
||
last_found = all_paras[i]
|
||
else:
|
||
break # Stop at first non-inserted paragraph
|
||
except Exception:
|
||
return None
|
||
|
||
return last_found
|
||
|
||
def _p_text_with_breaks(p: Paragraph) -> str:
|
||
"""Extract text from paragraph with line breaks preserved."""
|
||
parts = []
|
||
for node in p._element.xpath(".//*[local-name()='t' or local-name()='br' or local-name()='tab']"):
|
||
tag = node.tag.split('}', 1)[-1]
|
||
if tag == "t":
|
||
parts.append(node.text or "")
|
||
elif tag == "br":
|
||
parts.append("\n")
|
||
elif tag == "tab":
|
||
parts.append("\t")
|
||
return "".join(parts)
|
||
|
||
def _get_cell_full_text(cell) -> str:
|
||
"""
|
||
提取表格儲存格的完整文字內容,包含所有段落
|
||
"""
|
||
try:
|
||
cell_texts = []
|
||
for para in cell.paragraphs:
|
||
para_text = _p_text_with_breaks(para)
|
||
if para_text.strip():
|
||
cell_texts.append(para_text.strip())
|
||
|
||
# 用換行符連接所有段落
|
||
return '\n'.join(cell_texts)
|
||
except Exception as e:
|
||
logger.warning(f"提取儲存格文字失敗: {e}")
|
||
return ""
|
||
|
||
def _is_our_insert_block_text(text: str) -> bool:
|
||
"""檢查文字是否為翻譯插入區塊"""
|
||
if not text:
|
||
return False
|
||
text_lower = text.lower().strip()
|
||
return (
|
||
text_lower.startswith('【') or
|
||
text_lower.startswith('[翻譯') or
|
||
'翻譯:' in text_lower or
|
||
'translation:' in text_lower or
|
||
text_lower.startswith('translated:') or
|
||
"\u200b" in text
|
||
)
|
||
|
||
def _is_our_insert_block(p: Paragraph) -> bool:
|
||
"""Check if paragraph is our inserted translation (contains zero-width space marker)."""
|
||
text = _p_text_with_breaks(p)
|
||
return "\u200b" in text
|
||
|
||
def should_translate(text: str, src_lang: str) -> bool:
|
||
"""Determine if text should be translated based on content and source language."""
|
||
text = text.strip()
|
||
|
||
# 只要有字就翻譯 - 最小長度設為1
|
||
if len(text) < 1:
|
||
return False
|
||
|
||
# Skip pure numbers, dates, etc.
|
||
if re.match(r'^[\d\s\.\-\:\/]+$', text):
|
||
return False
|
||
|
||
# For auto-detect, translate if has CJK or meaningful text
|
||
if src_lang.lower() in ('auto', 'auto-detect'):
|
||
return _has_cjk(text) or len(text) > 5
|
||
|
||
return True
|
||
|
||
def _split_sentences(text: str, lang: str = 'auto') -> List[str]:
|
||
"""Split text into sentences using available libraries."""
|
||
if not text.strip():
|
||
return []
|
||
|
||
# Try blingfire first
|
||
if _HAS_BLINGFIRE and SENTENCE_MODE:
|
||
try:
|
||
sentences = blingfire.text_to_sentences(text).split('\n')
|
||
sentences = [s.strip() for s in sentences if s.strip()]
|
||
if sentences:
|
||
return sentences
|
||
except Exception as e:
|
||
logger.warning(f"Blingfire failed: {e}")
|
||
|
||
# Try pysbd
|
||
if _HAS_PYSBD and SENTENCE_MODE:
|
||
try:
|
||
seg = pysbd.Segmenter(language="en" if lang == "auto" else lang)
|
||
sentences = seg.segment(text)
|
||
sentences = [s.strip() for s in sentences if s.strip()]
|
||
if sentences:
|
||
return sentences
|
||
except Exception as e:
|
||
logger.warning(f"PySBD failed: {e}")
|
||
|
||
# Fallback to simple splitting
|
||
separators = ['. ', '。', '!', '?', '!', '?', '\n']
|
||
sentences = [text]
|
||
|
||
for sep in separators:
|
||
new_sentences = []
|
||
for s in sentences:
|
||
parts = s.split(sep)
|
||
if len(parts) > 1:
|
||
new_sentences.extend([p.strip() + sep.rstrip() for p in parts[:-1] if p.strip()])
|
||
if parts[-1].strip():
|
||
new_sentences.append(parts[-1].strip())
|
||
else:
|
||
new_sentences.append(s)
|
||
sentences = new_sentences
|
||
|
||
return [s for s in sentences if len(s.strip()) > 3]
|
||
|
||
# ---------- Segment class ----------
|
||
class Segment:
|
||
"""Represents a translatable text segment in a document."""
|
||
|
||
def __init__(self, kind: str, ref: Any, ctx: str, text: str):
|
||
self.kind = kind # 'para' | 'txbx'
|
||
self.ref = ref # Reference to original document element
|
||
self.ctx = ctx # Context information
|
||
self.text = text # Text content
|
||
|
||
# ---------- TextBox helpers ----------
|
||
def _txbx_iter_texts(doc: docx.Document):
|
||
"""
|
||
Yield (txbxContent_element, joined_source_text)
|
||
- Deeply collect all descendant <w:p> under txbxContent
|
||
- Skip our inserted translations: contains zero-width or (all italic and no CJK)
|
||
- Keep only lines that still have CJK
|
||
"""
|
||
def _p_text_flags(p_el):
|
||
parts = []
|
||
for node in p_el.xpath(".//*[local-name()='t' or local-name()='br' or local-name()='tab']"):
|
||
tag = node.tag.split('}', 1)[-1]
|
||
if tag == "t":
|
||
parts.append(node.text or "")
|
||
elif tag == "br":
|
||
parts.append("\n")
|
||
else:
|
||
parts.append(" ")
|
||
text = "".join(parts)
|
||
has_zero = ("\u200b" in text)
|
||
runs = p_el.xpath(".//*[local-name()='r']")
|
||
vis, ital = [], []
|
||
for r in runs:
|
||
rt = "".join([(t.text or "") for t in r.xpath(".//*[local-name()='t']")])
|
||
if (rt or "").strip():
|
||
vis.append(rt)
|
||
ital.append(bool(r.xpath(".//*[local-name()='i']")))
|
||
all_italic = (len(vis) > 0 and all(ital))
|
||
return text, has_zero, all_italic
|
||
|
||
for tx in doc._element.xpath(".//*[local-name()='txbxContent']"):
|
||
kept = []
|
||
for p in tx.xpath(".//*[local-name()='p']"): # all descendant paragraphs
|
||
text, has_zero, all_italic = _p_text_flags(p)
|
||
if not (text or "").strip():
|
||
continue
|
||
if has_zero:
|
||
continue # our inserted
|
||
for line in text.split("\n"):
|
||
if line.strip():
|
||
kept.append(line.strip())
|
||
if kept:
|
||
joined = "\n".join(kept)
|
||
yield tx, joined
|
||
|
||
def _txbx_append_paragraph(tx, text_block: str, italic: bool = True, font_size_pt: int = INSERT_FONT_SIZE_PT):
|
||
"""Append a paragraph to textbox content."""
|
||
p = OxmlElement("w:p")
|
||
r = OxmlElement("w:r")
|
||
rPr = OxmlElement("w:rPr")
|
||
if italic:
|
||
rPr.append(OxmlElement("w:i"))
|
||
if font_size_pt:
|
||
sz = OxmlElement("w:sz")
|
||
sz.set(qn("w:val"), str(int(font_size_pt * 2)))
|
||
rPr.append(sz)
|
||
r.append(rPr)
|
||
lines = text_block.split("\n")
|
||
for i, line in enumerate(lines):
|
||
if i > 0:
|
||
r.append(OxmlElement("w:br"))
|
||
t = OxmlElement("w:t")
|
||
t.set(qn("xml:space"), "preserve")
|
||
t.text = line
|
||
r.append(t)
|
||
tag = OxmlElement("w:t")
|
||
tag.set(qn("xml:space"), "preserve")
|
||
tag.text = "\u200b"
|
||
r.append(tag)
|
||
p.append(r)
|
||
tx.append(p)
|
||
|
||
def _txbx_tail_equals(tx, translations: List[str]) -> bool:
|
||
"""Check if textbox already contains the expected translations."""
|
||
paras = tx.xpath("./*[local-name()='p']")
|
||
if len(paras) < len(translations):
|
||
return False
|
||
tail = paras[-len(translations):]
|
||
for q, expect in zip(tail, translations):
|
||
parts = []
|
||
for node in q.xpath(".//*[local-name()='t' or local-name()='br']"):
|
||
tag = node.tag.split("}", 1)[-1]
|
||
parts.append("\n" if tag == "br" else (node.text or ""))
|
||
if _normalize_text("".join(parts).strip()) != _normalize_text(expect):
|
||
return False
|
||
return True
|
||
|
||
# ---------- Main extraction logic ----------
|
||
def _get_paragraph_key(p: Paragraph) -> str:
|
||
"""Generate a stable unique key for paragraph deduplication."""
|
||
try:
|
||
# Use XML content hash + text content for stable deduplication
|
||
xml_content = p._p.xml if hasattr(p._p, 'xml') else str(p._p)
|
||
text_content = _p_text_with_breaks(p)
|
||
combined = f"{hash(xml_content)}_{len(text_content)}_{text_content[:50]}"
|
||
return combined
|
||
except Exception:
|
||
# Fallback to simple text-based key
|
||
text_content = _p_text_with_breaks(p)
|
||
return f"fallback_{hash(text_content)}_{len(text_content)}"
|
||
|
||
def _collect_docx_segments(doc: docx.Document) -> List[Segment]:
|
||
"""
|
||
Enhanced segment collector with improved stability.
|
||
Handles paragraphs, tables, textboxes, and SDT Content Controls.
|
||
"""
|
||
segs: List[Segment] = []
|
||
seen_par_keys = set()
|
||
|
||
def _add_paragraph(p: Paragraph, ctx: str):
|
||
try:
|
||
p_key = _get_paragraph_key(p)
|
||
if p_key in seen_par_keys:
|
||
return
|
||
|
||
txt = _p_text_with_breaks(p)
|
||
if txt.strip() and not _is_our_insert_block(p):
|
||
segs.append(Segment("para", p, ctx, txt))
|
||
seen_par_keys.add(p_key)
|
||
except Exception as e:
|
||
# Log error but continue processing
|
||
logger.warning(f"段落處理錯誤: {e}, 跳過此段落")
|
||
|
||
def _process_container_content(container, ctx: str):
|
||
"""
|
||
Recursively processes content within a container (body, cell, or SDT content).
|
||
Identifies and handles paragraphs, tables, and SDT elements.
|
||
"""
|
||
if container._element is None:
|
||
return
|
||
|
||
for child_element in container._element:
|
||
qname = child_element.tag
|
||
|
||
if qname.endswith('}p'): # Paragraph
|
||
p = Paragraph(child_element, container)
|
||
_add_paragraph(p, ctx)
|
||
|
||
elif qname.endswith('}tbl'): # Table
|
||
table = Table(child_element, container)
|
||
for r_idx, row in enumerate(table.rows, 1):
|
||
for c_idx, cell in enumerate(row.cells, 1):
|
||
cell_ctx = f"{ctx} > Tbl(r{r_idx},c{c_idx})"
|
||
|
||
# 使用儲存格為單位的提取方式(而非逐段落提取)
|
||
cell_text = _get_cell_full_text(cell)
|
||
if cell_text.strip() and not _is_our_insert_block_text(cell_text):
|
||
segs.append(Segment("table_cell", cell, cell_ctx, cell_text))
|
||
|
||
elif qname.endswith('}sdt'): # Structured Document Tag (SDT)
|
||
sdt_ctx = f"{ctx} > SDT"
|
||
|
||
# 1. 提取 SDT 的元數據文本 (Placeholder, Dropdown items)
|
||
ns = {'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'}
|
||
|
||
# 提取 Placeholder text
|
||
placeholder_texts = []
|
||
for t in child_element.xpath('.//w:placeholder//w:t', namespaces=ns):
|
||
if t.text:
|
||
placeholder_texts.append(t.text)
|
||
if placeholder_texts:
|
||
full_placeholder = "".join(placeholder_texts).strip()
|
||
if full_placeholder:
|
||
segs.append(Segment("para", child_element, f"{sdt_ctx}-Placeholder", full_placeholder))
|
||
|
||
# 提取 Dropdown list items
|
||
list_items = []
|
||
for item in child_element.xpath('.//w:dropDownList/w:listItem', namespaces=ns):
|
||
display_text = item.get(qn('w:displayText'))
|
||
if display_text:
|
||
list_items.append(display_text)
|
||
if list_items:
|
||
items_as_text = "\n".join(list_items)
|
||
segs.append(Segment("para", child_element, f"{sdt_ctx}-Dropdown", items_as_text))
|
||
|
||
# 2. 遞迴處理 SDT 的實際內容 (sdtContent)
|
||
sdt_content_element = child_element.find(qn('w:sdtContent'))
|
||
if sdt_content_element is not None:
|
||
class SdtContentWrapper:
|
||
def __init__(self, element, parent):
|
||
self._element = element
|
||
self._parent = parent
|
||
|
||
sdt_content_wrapper = SdtContentWrapper(sdt_content_element, container)
|
||
_process_container_content(sdt_content_wrapper, sdt_ctx)
|
||
|
||
# --- Main execution starts here ---
|
||
|
||
# 1. Process the main document body
|
||
_process_container_content(doc._body, "Body")
|
||
|
||
# 2. Process textboxes
|
||
for tx, s in _txbx_iter_texts(doc):
|
||
if s.strip() and (_has_cjk(s) or should_translate(s, 'auto')):
|
||
segs.append(Segment("txbx", tx, "TextBox", s))
|
||
|
||
return segs
|
||
|
||
def _insert_docx_translations(doc: docx.Document, segs: List[Segment],
|
||
tmap: Dict[Tuple[str, str], str],
|
||
targets: List[str], log=lambda s: None) -> Tuple[int, int]:
|
||
"""
|
||
Insert translations into DOCX document segments.
|
||
|
||
CRITICAL: This function contains the fix for the major translation insertion bug.
|
||
The key fix is in the segment filtering logic - we now correctly check if any target
|
||
language has translation available using the proper key format (target_lang, text).
|
||
|
||
Args:
|
||
doc: The DOCX document object
|
||
segs: List of segments to translate
|
||
tmap: Translation map with keys as (target_language, source_text)
|
||
targets: List of target languages in order
|
||
log: Logging function
|
||
|
||
Returns:
|
||
Tuple of (successful_insertions, skipped_insertions)
|
||
|
||
Key Bug Fix:
|
||
OLD (INCORRECT): if (seg.kind, seg.text) not in tmap and (targets[0], seg.text) not in tmap
|
||
NEW (CORRECT): has_any_translation = any((tgt, seg.text) in tmap for tgt in targets)
|
||
"""
|
||
ok_cnt = skip_cnt = 0
|
||
|
||
# Helper function to add a formatted run to a paragraph
|
||
def _add_formatted_run(p: Paragraph, text: str, italic: bool, font_size_pt: int):
|
||
lines = text.split("\n")
|
||
for i, line in enumerate(lines):
|
||
run = p.add_run(line)
|
||
if italic:
|
||
run.italic = True
|
||
if font_size_pt:
|
||
run.font.size = Pt(font_size_pt)
|
||
if i < len(lines) - 1:
|
||
run.add_break()
|
||
# Add our zero-width space marker
|
||
tag_run = p.add_run("\u200b")
|
||
if italic:
|
||
tag_run.italic = True
|
||
if font_size_pt:
|
||
tag_run.font.size = Pt(font_size_pt)
|
||
|
||
for seg in segs:
|
||
# Check if any target language has translation for this segment
|
||
has_any_translation = any((tgt, seg.text) in tmap for tgt in targets)
|
||
if not has_any_translation:
|
||
log(f"[SKIP] 無翻譯結果: {seg.ctx} | {seg.text[:50]}...")
|
||
skip_cnt += 1
|
||
continue
|
||
|
||
# Get translations for all targets, with fallback for missing ones
|
||
translations = []
|
||
for tgt in targets:
|
||
if (tgt, seg.text) in tmap:
|
||
translations.append(tmap[(tgt, seg.text)])
|
||
else:
|
||
log(f"[WARNING] 缺少 {tgt} 翻譯: {seg.text[:30]}...")
|
||
translations.append(f"【翻譯查詢失敗|{tgt}】{seg.text[:50]}...")
|
||
|
||
log(f"[INSERT] 準備插入 {len(translations)} 個翻譯到 {seg.ctx}: {seg.text[:30]}...")
|
||
|
||
if seg.kind == "para":
|
||
# Check if this is an SDT segment (ref is an XML element, not a Paragraph)
|
||
if hasattr(seg.ref, 'tag') and seg.ref.tag.endswith('}sdt'):
|
||
# Handle SDT segments - insert translation into sdtContent
|
||
sdt_element = seg.ref
|
||
ns = {'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'}
|
||
sdt_content = sdt_element.find(qn('w:sdtContent'))
|
||
|
||
if sdt_content is not None:
|
||
# Check if translations already exist
|
||
existing_paras = sdt_content.xpath('.//w:p', namespaces=ns)
|
||
existing_texts = []
|
||
for ep in existing_paras:
|
||
p_obj = Paragraph(ep, None)
|
||
if _is_our_insert_block(p_obj):
|
||
existing_texts.append(_p_text_with_breaks(p_obj))
|
||
|
||
# Check if all translations already exist
|
||
if len(existing_texts) >= len(translations):
|
||
if all(_normalize_text(e) == _normalize_text(t) for e, t in zip(existing_texts[:len(translations)], translations)):
|
||
skip_cnt += 1
|
||
log(f"[SKIP] SDT 已存在翻譯: {seg.text[:30]}...")
|
||
continue
|
||
|
||
# Add translations to SDT content
|
||
for t in translations:
|
||
if not any(_normalize_text(t) == _normalize_text(e) for e in existing_texts):
|
||
# Create new paragraph in SDT content
|
||
new_p_element = OxmlElement("w:p")
|
||
sdt_content.append(new_p_element)
|
||
new_p = Paragraph(new_p_element, None)
|
||
_add_formatted_run(new_p, t, italic=True, font_size_pt=INSERT_FONT_SIZE_PT)
|
||
|
||
ok_cnt += 1
|
||
log(f"[SUCCESS] SDT 插入翻譯(交錯格式)")
|
||
continue
|
||
|
||
p: Paragraph = seg.ref
|
||
|
||
# --- CONTEXT-AWARE INSERTION LOGIC (from successful version) ---
|
||
# Check if the paragraph's parent is a table cell
|
||
if isinstance(p._parent, _Cell):
|
||
cell = p._parent
|
||
|
||
try:
|
||
# Find the current paragraph's position in the cell
|
||
cell_paragraphs = list(cell.paragraphs)
|
||
p_index = -1
|
||
for idx, cell_p in enumerate(cell_paragraphs):
|
||
if cell_p._element == p._element:
|
||
p_index = idx
|
||
break
|
||
|
||
if p_index == -1:
|
||
log(f"[WARNING] 無法找到段落在單元格中的位置,使用原始方法")
|
||
# Fallback to original method
|
||
for block in translations:
|
||
new_p = cell.add_paragraph()
|
||
_add_formatted_run(new_p, block, italic=True, font_size_pt=INSERT_FONT_SIZE_PT)
|
||
ok_cnt += 1
|
||
continue
|
||
|
||
# Check if translations already exist right after this paragraph
|
||
existing_texts = []
|
||
check_limit = min(p_index + 1 + len(translations), len(cell_paragraphs))
|
||
for idx in range(p_index + 1, check_limit):
|
||
if _is_our_insert_block(cell_paragraphs[idx]):
|
||
existing_texts.append(_p_text_with_breaks(cell_paragraphs[idx]))
|
||
|
||
# Check if all translations already exist in order
|
||
if len(existing_texts) >= len(translations):
|
||
if all(_normalize_text(e) == _normalize_text(t) for e, t in zip(existing_texts[:len(translations)], translations)):
|
||
skip_cnt += 1
|
||
log(f"[SKIP] 表格單元格已存在翻譯: {seg.text[:30]}...")
|
||
continue
|
||
|
||
# Determine which translations need to be added
|
||
to_add = []
|
||
for t in translations:
|
||
if not any(_normalize_text(t) == _normalize_text(e) for e in existing_texts):
|
||
to_add.append(t)
|
||
|
||
if not to_add:
|
||
skip_cnt += 1
|
||
log(f"[SKIP] 表格單元格所有翻譯已存在: {seg.text[:30]}...")
|
||
continue
|
||
|
||
# Insert new paragraphs right after the current paragraph
|
||
insert_after = p
|
||
for block in to_add:
|
||
try:
|
||
# Create new paragraph and insert it after the current position
|
||
new_p_element = OxmlElement("w:p")
|
||
insert_after._element.addnext(new_p_element)
|
||
new_p = Paragraph(new_p_element, cell)
|
||
_add_formatted_run(new_p, block, italic=True, font_size_pt=INSERT_FONT_SIZE_PT)
|
||
insert_after = new_p # Update position for next insertion
|
||
except Exception as e:
|
||
log(f"[ERROR] 表格插入失敗: {e}, 嘗試fallback方法")
|
||
# Fallback: add at the end of cell
|
||
try:
|
||
new_p = cell.add_paragraph()
|
||
_add_formatted_run(new_p, block, italic=True, font_size_pt=INSERT_FONT_SIZE_PT)
|
||
log(f"[SUCCESS] Fallback插入成功")
|
||
except Exception as e2:
|
||
log(f"[FATAL] Fallback也失敗: {e2}")
|
||
continue
|
||
ok_cnt += 1
|
||
log(f"[SUCCESS] 表格單元格插入 {len(to_add)} 個翻譯(緊接原文後)")
|
||
|
||
except Exception as e:
|
||
log(f"[ERROR] 表格處理全面失敗: {e}, 跳過此段落")
|
||
continue
|
||
|
||
else:
|
||
# Normal paragraph (not in table cell) - SIMPLIFIED FOR DEBUGGING
|
||
try:
|
||
# TEMPORARILY DISABLE existing translation check to force insertion
|
||
log(f"[DEBUG] 強制插入翻譯到段落: {seg.text[:30]}...")
|
||
|
||
# Force all translations to be added
|
||
to_add = translations
|
||
|
||
# Use simple positioning - always insert after current paragraph
|
||
anchor = p
|
||
|
||
for block in to_add:
|
||
try:
|
||
log(f"[DEBUG] 嘗試插入: {block[:50]}...")
|
||
anchor = _append_after(anchor, block, italic=True, font_size_pt=INSERT_FONT_SIZE_PT)
|
||
log(f"[SUCCESS] _append_after成功插入")
|
||
except Exception as e:
|
||
log(f"[ERROR] _append_after失敗: {e}, 嘗試簡化插入")
|
||
try:
|
||
# Fallback: simple append
|
||
if hasattr(p._parent, 'add_paragraph'):
|
||
new_p = p._parent.add_paragraph()
|
||
_add_formatted_run(new_p, block, italic=True, font_size_pt=INSERT_FONT_SIZE_PT)
|
||
log(f"[SUCCESS] Fallback段落插入成功")
|
||
else:
|
||
log(f"[ERROR] 無法進行fallback插入")
|
||
except Exception as e2:
|
||
log(f"[FATAL] Fallback也失敗: {e2}")
|
||
continue
|
||
|
||
ok_cnt += 1
|
||
log(f"[SUCCESS] 段落強制插入 {len(to_add)} 個翻譯")
|
||
|
||
except Exception as e:
|
||
log(f"[ERROR] 段落處理失敗: {e}, 跳過此段落")
|
||
continue
|
||
|
||
elif seg.kind == "table_cell":
|
||
# 處理表格儲存格翻譯插入
|
||
cell = seg.ref # cell 是 _Cell 對象
|
||
|
||
# 檢查儲存格是否已有翻譯
|
||
existing_translations = []
|
||
cell_paragraphs = list(cell.paragraphs)
|
||
|
||
# 檢查儲存格末尾是否已有翻譯
|
||
translation_start_index = len(cell_paragraphs)
|
||
for i in range(len(cell_paragraphs) - 1, -1, -1):
|
||
if _is_our_insert_block(cell_paragraphs[i]):
|
||
existing_translations.insert(0, _p_text_with_breaks(cell_paragraphs[i]))
|
||
translation_start_index = i
|
||
else:
|
||
break
|
||
|
||
# 檢查是否所有翻譯都已存在且相同
|
||
if len(existing_translations) >= len(translations):
|
||
if all(_normalize_text(e) == _normalize_text(t) for e, t in zip(existing_translations[:len(translations)], translations)):
|
||
skip_cnt += 1
|
||
log(f"[SKIP] 表格儲存格已存在翻譯: {seg.text[:30]}...")
|
||
continue
|
||
|
||
# 移除舊的翻譯段落(如果有的話)
|
||
for i in range(len(cell_paragraphs) - 1, translation_start_index - 1, -1):
|
||
if _is_our_insert_block(cell_paragraphs[i]):
|
||
cell._element.remove(cell_paragraphs[i]._element)
|
||
|
||
# 檢查是否為簡單的短文本儲存格(只有原文,沒有複雜結構)
|
||
cell_content = cell.text.strip()
|
||
is_simple_cell = len(cell_content) <= 10 and cell_content == seg.text.strip()
|
||
|
||
if is_simple_cell:
|
||
# 對於簡單短文本,直接替換內容而不是添加段落
|
||
log(f"[INFO] 簡單儲存格內容替換: '{seg.text.strip()}' -> '{translations[0] if translations else 'N/A'}'")
|
||
|
||
# 清空所有段落內容
|
||
for para in cell.paragraphs:
|
||
para.clear()
|
||
|
||
# 在第一個段落中添加原文和翻譯
|
||
first_para = cell.paragraphs[0] if cell.paragraphs else cell.add_paragraph()
|
||
|
||
# 添加原文
|
||
run_orig = first_para.add_run(seg.text.strip())
|
||
|
||
# 添加換行和翻譯
|
||
for t in translations:
|
||
first_para.add_run('\n')
|
||
run_trans = first_para.add_run(t)
|
||
run_trans.italic = True
|
||
if INSERT_FONT_SIZE_PT:
|
||
run_trans.font.size = Pt(INSERT_FONT_SIZE_PT)
|
||
|
||
# 添加標記
|
||
tag_run = first_para.add_run("\u200b")
|
||
tag_run.italic = True
|
||
if INSERT_FONT_SIZE_PT:
|
||
tag_run.font.size = Pt(INSERT_FONT_SIZE_PT)
|
||
else:
|
||
# 對於複雜儲存格,使用原有的添加段落方式
|
||
for t in translations:
|
||
new_p = cell.add_paragraph()
|
||
_add_formatted_run(new_p, t, italic=True, font_size_pt=INSERT_FONT_SIZE_PT)
|
||
|
||
ok_cnt += 1
|
||
log(f"[SUCCESS] 表格儲存格插入 {len(translations)} 個翻譯")
|
||
|
||
elif seg.kind == "txbx":
|
||
tx = seg.ref
|
||
# Check if textbox already has our translations at the end
|
||
if _txbx_tail_equals(tx, translations):
|
||
skip_cnt += 1
|
||
log(f"[SKIP] 文字框已存在翻譯: {seg.text[:30]}...")
|
||
continue
|
||
|
||
# Append translations to textbox
|
||
for t in translations:
|
||
_txbx_append_paragraph(tx, t, italic=True, font_size_pt=INSERT_FONT_SIZE_PT)
|
||
|
||
ok_cnt += 1
|
||
log(f"[SUCCESS] 文字框插入 {len(translations)} 個翻譯")
|
||
|
||
return ok_cnt, skip_cnt
|
||
|
||
# ---------- Main DocumentProcessor class ----------
|
||
class DocumentProcessor:
|
||
"""Enhanced document processor with complete DOCX handling capabilities."""
|
||
|
||
def __init__(self):
|
||
self.logger = logger
|
||
|
||
def extract_docx_segments(self, file_path: str) -> List[Segment]:
|
||
"""Extract all translatable segments from DOCX file."""
|
||
try:
|
||
doc = docx.Document(file_path)
|
||
segments = _collect_docx_segments(doc)
|
||
|
||
self.logger.info(f"Extracted {len(segments)} segments from {file_path}")
|
||
for seg in segments[:5]: # Log first 5 segments for debugging
|
||
self.logger.debug(f"Segment: {seg.kind} | {seg.ctx} | {seg.text[:50]}...")
|
||
|
||
return segments
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Failed to extract DOCX segments from {file_path}: {str(e)}")
|
||
raise FileProcessingError(f"DOCX 文件分析失敗: {str(e)}")
|
||
|
||
def _rematch_segments_to_document(self, doc: docx.Document, old_segments: List[Segment]) -> List[Segment]:
|
||
"""Re-match segments from old document instance to new document instance."""
|
||
try:
|
||
# Extract fresh segments from the current document instance
|
||
fresh_segments = _collect_docx_segments(doc)
|
||
|
||
# Match old segments with fresh segments based on text content
|
||
matched_segments = []
|
||
|
||
for old_seg in old_segments:
|
||
# Find matching segment in fresh segments
|
||
matched = False
|
||
for fresh_seg in fresh_segments:
|
||
if (old_seg.kind == fresh_seg.kind and
|
||
old_seg.ctx == fresh_seg.ctx and
|
||
_normalize_text(old_seg.text) == _normalize_text(fresh_seg.text)):
|
||
matched_segments.append(fresh_seg)
|
||
matched = True
|
||
break
|
||
|
||
if not matched:
|
||
self.logger.warning(f"Failed to match segment: {old_seg.text[:50]}...")
|
||
# Still add the old segment but it might not work for insertion
|
||
matched_segments.append(old_seg)
|
||
|
||
self.logger.debug(f"Re-matched {len(matched_segments)} segments to current document")
|
||
return matched_segments
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Failed to re-match segments: {str(e)}")
|
||
# Return original segments as fallback
|
||
return old_segments
|
||
|
||
def insert_docx_translations(self, file_path: str, segments: List[Segment],
|
||
translation_map: Dict[Tuple[str, str], str],
|
||
target_languages: List[str], output_path: str) -> Tuple[int, int]:
|
||
"""Insert translations into DOCX file and save to output path."""
|
||
try:
|
||
doc = docx.Document(file_path)
|
||
|
||
# CRITICAL FIX: Re-match segments with the current document instance
|
||
# The original segments were extracted from a different document instance
|
||
matched_segments = self._rematch_segments_to_document(doc, segments)
|
||
|
||
def log_func(msg: str):
|
||
self.logger.debug(msg)
|
||
|
||
ok_count, skip_count = _insert_docx_translations(
|
||
doc, matched_segments, translation_map, target_languages, log_func
|
||
)
|
||
|
||
# Save the modified document
|
||
doc.save(output_path)
|
||
|
||
self.logger.info(f"Inserted {ok_count} translations, skipped {skip_count}. Saved to: {output_path}")
|
||
return ok_count, skip_count
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Failed to insert DOCX translations: {str(e)}")
|
||
raise FileProcessingError(f"DOCX 翻譯插入失敗: {str(e)}")
|
||
|
||
def split_text_into_sentences(self, text: str, language: str = 'auto') -> List[str]:
|
||
"""Split text into sentences using the best available method."""
|
||
return _split_sentences(text, language)
|
||
|
||
def should_translate_text(self, text: str, source_language: str) -> bool:
|
||
"""Determine if text should be translated."""
|
||
return should_translate(text, source_language)
|
||
|
||
def insert_docx_combined_translations(self, file_path: str, segments: List[Segment],
|
||
translation_map: Dict[Tuple[str, str], str],
|
||
target_languages: List[str], output_path: str) -> Tuple[int, int]:
|
||
"""Insert all translations into a single DOCX file with combined multi-language output.
|
||
|
||
This creates a combined file where each original text is followed by all translations
|
||
in the format: original\n英文\n越南文 etc.
|
||
"""
|
||
try:
|
||
doc = docx.Document(file_path)
|
||
|
||
# Re-match segments with the current document instance
|
||
matched_segments = self._rematch_segments_to_document(doc, segments)
|
||
|
||
def log_func(msg: str):
|
||
self.logger.debug(msg)
|
||
|
||
# Use the existing _insert_docx_translations function which already supports
|
||
# multiple target languages in a single document
|
||
ok_count, skip_count = _insert_docx_translations(
|
||
doc, matched_segments, translation_map, target_languages, log_func
|
||
)
|
||
|
||
# Save the combined document
|
||
doc.save(output_path)
|
||
|
||
self.logger.info(f"Generated combined multi-language file: {output_path}")
|
||
self.logger.info(f"Inserted {ok_count} translations, skipped {skip_count}")
|
||
return ok_count, skip_count
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Failed to create combined DOCX translations: {str(e)}")
|
||
raise FileProcessingError(f"組合多語言 DOCX 檔案生成失敗: {str(e)}") |