1494 lines
66 KiB
Python
1494 lines
66 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
Document Translator - Robust GUI (Dify)
|
||
- Dedup + soft-skip + only-supplement insertion
|
||
- Full paragraph discovery (tables/SDT/lists/nested) with textbox exclusion
|
||
- TextBox deep parse with safe filtering (skip our inserted translations)
|
||
- Orderable target languages (GUI)
|
||
- Word COM only for header/footer shapes (optional)
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import re
|
||
import time
|
||
import threading
|
||
import queue
|
||
import sqlite3
|
||
from pathlib import Path
|
||
from typing import List, Tuple, Optional, Dict
|
||
from docx.table import _Cell
|
||
import requests
|
||
|
||
# ---------- Optional deps ----------
|
||
try:
|
||
import pythoncom
|
||
import win32com.client as win32
|
||
from win32com.client import constants as c
|
||
_WIN32COM_AVAILABLE = (sys.platform == "win32")
|
||
except Exception:
|
||
_WIN32COM_AVAILABLE = False
|
||
|
||
try:
|
||
import blingfire
|
||
_HAS_BLINGFIRE = True
|
||
except Exception:
|
||
_HAS_BLINGFIRE = False
|
||
|
||
try:
|
||
import pysbd
|
||
_HAS_PYSBD = True
|
||
except Exception:
|
||
_HAS_PYSBD = False
|
||
|
||
# ---------- Office libs ----------
|
||
import docx
|
||
from docx.text.paragraph import Paragraph
|
||
from docx.table import Table
|
||
from docx.shared import Pt
|
||
from docx.oxml import OxmlElement
|
||
from docx.oxml.ns import qn
|
||
|
||
import pptx
|
||
from pptx.util import Pt as PPTPt
|
||
|
||
import openpyxl
|
||
from openpyxl.styles import Alignment
|
||
from openpyxl.comments import Comment
|
||
|
||
from PyPDF2 import PdfReader
|
||
|
||
# ---------- App constants ----------
|
||
APP_TITLE = "Document Translator (Robust, Dify)"
|
||
DEFAULT_OUTPUT_DIR = "translated_files"
|
||
SUPPORTED = {".docx", ".doc", ".pptx", ".xlsx", ".xls", ".pdf"}
|
||
|
||
# API config is read from api.txt
|
||
DIFY_API_BASE_URL = ""
|
||
DIFY_API_KEY = ""
|
||
|
||
# ---------- Tunables ----------
|
||
API_CONNECT_TIMEOUT_S = 10
|
||
API_READ_TIMEOUT_S = 60
|
||
API_ATTEMPTS = 3
|
||
API_BACKOFF_BASE = 1.6
|
||
SENTENCE_MODE = True
|
||
INSERT_FONT_SIZE_PT = 10
|
||
EXCEL_FORMULA_MODE = "skip" # "skip" | "comment"
|
||
MAX_SHAPE_CHARS = 1200
|
||
|
||
# ---------- Load API config ----------
|
||
def load_api_config_from_file():
|
||
global DIFY_API_BASE_URL, DIFY_API_KEY
|
||
try:
|
||
with open("api.txt", "r", encoding="utf-8") as f:
|
||
for line in f:
|
||
if line.startswith("base_url:"):
|
||
DIFY_API_BASE_URL = line.split(":", 1)[1].strip()
|
||
elif line.startswith("api:"):
|
||
DIFY_API_KEY = line.split(":", 1)[1].strip()
|
||
except FileNotFoundError:
|
||
pass
|
||
|
||
# ---------- Cache ----------
|
||
class TranslationCache:
|
||
def __init__(self, db_path: Path):
|
||
self.conn = sqlite3.connect(str(db_path), check_same_thread=False)
|
||
self.lock = threading.Lock()
|
||
with self.lock:
|
||
cur = self.conn.cursor()
|
||
cur.execute("""
|
||
CREATE TABLE IF NOT EXISTS translations(
|
||
src TEXT NOT NULL,
|
||
tgt TEXT NOT NULL,
|
||
text TEXT NOT NULL,
|
||
result TEXT NOT NULL,
|
||
PRIMARY KEY (src, tgt, text)
|
||
)
|
||
""")
|
||
self.conn.commit()
|
||
def get(self, src: str, tgt: str, text: str) -> Optional[str]:
|
||
with self.lock:
|
||
cur = self.conn.cursor()
|
||
cur.execute("SELECT result FROM translations WHERE src=? AND tgt=? AND text=?",
|
||
(src, tgt, text))
|
||
r = cur.fetchone()
|
||
return r[0] if r else None
|
||
def put(self, src: str, tgt: str, text: str, result: str):
|
||
with self.lock:
|
||
cur = self.conn.cursor()
|
||
cur.execute("INSERT OR REPLACE INTO translations (src, tgt, text, result) VALUES (?, ?, ?, ?)",
|
||
(src, tgt, text, result))
|
||
self.conn.commit()
|
||
def close(self):
|
||
with self.lock:
|
||
try: self.conn.close()
|
||
except Exception: pass
|
||
|
||
# ---------- Text utils ----------
|
||
def _normalize_text(s: str) -> str:
|
||
return re.sub(r"\s+", " ", (s or "").strip()).lower()
|
||
|
||
|
||
def should_translate(text, source_lang: str) -> bool:
|
||
"""
|
||
Translation decision:
|
||
- If source_lang starts with 'en' (English): translate any non-empty text (letters/digits/etc.).
|
||
- If source_lang starts with 'auto' or is empty: translate any non-empty alnum-containing text.
|
||
- Else (non-English): translate unless the text is ALL English letters OR ALL digits.
|
||
"""
|
||
if not text:
|
||
return False
|
||
if not str(text).strip():
|
||
return False
|
||
s = (source_lang or "").strip().lower()
|
||
filtered = "".join(ch for ch in str(text) if str(ch).isalnum())
|
||
if not filtered:
|
||
return False
|
||
if s.startswith("en"):
|
||
return True
|
||
if s.startswith("auto") or s == "":
|
||
return True
|
||
ASCII = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
|
||
if all((c in ASCII) for c in filtered):
|
||
return False
|
||
if filtered.isdigit():
|
||
return False
|
||
return True
|
||
def _has_cjk(s: str) -> bool:
|
||
return any('\u4e00' <= ch <= '\u9fff' for ch in s or "")
|
||
|
||
def _split_sentences(line: str, lang_hint: Optional[str]) -> List[str]:
|
||
line = line or ""
|
||
if not line.strip():
|
||
return []
|
||
if _HAS_BLINGFIRE:
|
||
try:
|
||
s = blingfire.text_to_sentences(line)
|
||
arr = [t.strip() for t in s.split("\n") if t.strip()]
|
||
if arr: return arr
|
||
except Exception:
|
||
pass
|
||
if _HAS_PYSBD:
|
||
try:
|
||
seg = pysbd.Segmenter(language="en", clean=False)
|
||
arr = [t.strip() for t in seg.segment(line) if t.strip()]
|
||
if arr: return arr
|
||
except Exception:
|
||
pass
|
||
# fallback: simple punctuation heuristic
|
||
out, buf = [], ""
|
||
for ch in line:
|
||
buf += ch
|
||
if ch in "。!?" or ch in ".!?":
|
||
out.append(buf.strip()); buf = ""
|
||
if buf.strip(): out.append(buf.strip())
|
||
return out
|
||
|
||
# ---------- API ----------
|
||
class ApiError(Exception): pass
|
||
|
||
class DifyClient:
|
||
def __init__(self, base_url: str, api_key: str, log=lambda s: None):
|
||
self.base_url = base_url.rstrip("/")
|
||
self.api_key = api_key.strip()
|
||
self.log = log
|
||
self._resolved_path = None
|
||
def _headers(self):
|
||
return {"Authorization": f"Bearer {self.api_key}",
|
||
"Content-Type": "application/json"}
|
||
def _try_post(self, path: str, payload: dict) -> requests.Response:
|
||
url = f"{self.base_url}{path}"
|
||
return requests.post(url, headers=self._headers(), json=payload,
|
||
timeout=(API_CONNECT_TIMEOUT_S, API_READ_TIMEOUT_S))
|
||
def _detect_endpoint(self) -> str:
|
||
base_has_v1 = self.base_url.rstrip("/").endswith("/v1")
|
||
candidates = ["/chat-messages"] if base_has_v1 else ["/v1/chat-messages", "/chat-messages"]
|
||
payload = {"inputs": {}, "query": "ping", "user": "health-check", "response_mode": "blocking"}
|
||
for path in candidates:
|
||
try:
|
||
r = self._try_post(path, payload)
|
||
if r.status_code in (200, 401, 403):
|
||
self._resolved_path = path
|
||
self.log(f"[API Detect] use {path} (HTTP {r.status_code})")
|
||
return path
|
||
if r.status_code in (404, 405):
|
||
self.log(f"[API Detect] {path} not usable (HTTP {r.status_code}), trying next...")
|
||
continue
|
||
self.log(f"[API Detect] {path} unexpected HTTP {r.status_code}: {r.text[:180]}")
|
||
except requests.exceptions.RequestException as e:
|
||
self.log(f"[API Detect] {path} request error: {e}")
|
||
self._resolved_path = "/v1/chat-messages"
|
||
self.log("[API Detect] fallback to /v1/chat-messages")
|
||
return self._resolved_path
|
||
def health_check(self) -> Tuple[bool, str]:
|
||
path = self._detect_endpoint()
|
||
payload = {"inputs": {}, "query": "健康檢查 health check", "user": "health-check", "response_mode": "blocking"}
|
||
try:
|
||
r = self._try_post(path, payload)
|
||
if r.status_code == 200:
|
||
try:
|
||
data = r.json()
|
||
ans = data.get("answer", "")
|
||
return True, f"OK via {path}; answer len={len(ans)}"
|
||
except Exception as e:
|
||
return False, f"Health JSON parse error via {path}: {e}"
|
||
else:
|
||
return False, f"HTTP {r.status_code} via {path}: {r.text[:180]}"
|
||
except requests.exceptions.RequestException as e:
|
||
return False, f"Request error via {path}: {e}"
|
||
def translate_once(self, text: str, tgt: str, src_lang: Optional[str]) -> Tuple[bool, str]:
|
||
if self._resolved_path is None:
|
||
self._detect_endpoint()
|
||
prompt = self._build_prompt(text, tgt, src_lang)
|
||
payload = {"inputs": {}, "query": prompt, "user": "doc-translator-user", "response_mode": "blocking"}
|
||
last = None
|
||
for attempt in range(1, API_ATTEMPTS+1):
|
||
try:
|
||
r = self._try_post(self._resolved_path, payload)
|
||
if r.status_code == 200:
|
||
data = r.json()
|
||
ans = data.get("answer")
|
||
if isinstance(ans, str):
|
||
return True, ans
|
||
last = f"Invalid JSON: {data}"
|
||
else:
|
||
last = f"HTTP {r.status_code}: {r.text[:240]}"
|
||
except requests.exceptions.RequestException as e:
|
||
last = str(e)
|
||
time.sleep(API_BACKOFF_BASE * attempt)
|
||
return False, str(last)
|
||
@staticmethod
|
||
def _build_prompt(text: str, target_language: str, source_language: Optional[str]) -> str:
|
||
sl = source_language if (source_language and source_language.lower() not in ("auto","auto-detect","auto detect")) else "Auto"
|
||
return (
|
||
f"Task: Translate ONLY into {target_language} from {sl}.\n"
|
||
f"Rules:\n"
|
||
f"1) Output translation text ONLY (no source text, no notes, no questions, no language-detection remarks).\n"
|
||
f"2) Preserve original line breaks.\n"
|
||
f"3) Do NOT wrap in quotes or code blocks.\n\n"
|
||
f"{text}"
|
||
)
|
||
|
||
|
||
class OllamaClient:
|
||
def __init__(self, base_url: str = "http://localhost:11434", model: str = "gpt-oss:latest", log=lambda s: None):
|
||
self.base_url = base_url.rstrip("/")
|
||
self.model = model
|
||
self.log = log
|
||
|
||
def _gen_url(self, path: str) -> str:
|
||
return f"{self.base_url}{path}"
|
||
|
||
def health_check(self) -> Tuple[bool, str]:
|
||
try:
|
||
r = requests.get(self._gen_url("/api/tags"), timeout=(API_CONNECT_TIMEOUT_S, API_READ_TIMEOUT_S))
|
||
if r.status_code == 200:
|
||
names = [m.get("name","") for m in (r.json().get("models") or []) if isinstance(m, dict)]
|
||
return True, f"OK; models={', '.join(names[:6]) + ('...' if len(names)>6 else '')}"
|
||
else:
|
||
return False, f"HTTP {r.status_code}: {r.text[:180]}"
|
||
except requests.exceptions.RequestException as e:
|
||
return False, f"Request error: {e}"
|
||
|
||
def translate_once(self, text: str, tgt: str, src_lang: Optional[str]) -> Tuple[bool, str]:
|
||
prompt = DifyClient._build_prompt(text, tgt, src_lang)
|
||
payload = {"model": self.model, "prompt": prompt, "stream": False}
|
||
last = None
|
||
for attempt in range(1, API_ATTEMPTS+1):
|
||
try:
|
||
r = requests.post(self._gen_url("/api/generate"), json=payload,
|
||
timeout=(API_CONNECT_TIMEOUT_S, API_READ_TIMEOUT_S))
|
||
if r.status_code == 200:
|
||
data = r.json()
|
||
ans = data.get("response", "")
|
||
return True, ans.strip()
|
||
last = f"HTTP {r.status_code}: {r.text[:180]}"
|
||
except requests.exceptions.RequestException as e:
|
||
last = f"Request error: {e}"
|
||
return False, str(last)
|
||
|
||
def list_ollama_models(base_url: str = "http://localhost:11434") -> list:
|
||
try:
|
||
r = requests.get(base_url.rstrip("/") + "/api/tags", timeout=(API_CONNECT_TIMEOUT_S, API_READ_TIMEOUT_S))
|
||
if r.status_code == 200:
|
||
return [m.get("name","") for m in (r.json().get("models") or []) if isinstance(m, dict)]
|
||
except Exception:
|
||
pass
|
||
return ["gpt-oss:latest"]
|
||
|
||
# ---------- High-level translate helpers ----------
|
||
def translate_block_sentencewise(text: str, tgt: str, src_lang: Optional[str],
|
||
cache: TranslationCache, client: DifyClient) -> Tuple[bool, str]:
|
||
"""
|
||
Translate a multi-line block line-by-line, sentence-wise; cache per sentence.
|
||
Returns (all_ok, joined_result).
|
||
"""
|
||
if not text or not text.strip():
|
||
return True, ""
|
||
src_key = (src_lang or "auto").lower()
|
||
|
||
# Whole-block cache first
|
||
cached_whole = cache.get(src_key, tgt, text)
|
||
if cached_whole is not None:
|
||
return True, cached_whole
|
||
|
||
out_lines: List[str] = []
|
||
all_ok = True
|
||
|
||
for raw_line in text.split("\n"):
|
||
if not raw_line.strip():
|
||
out_lines.append("")
|
||
continue
|
||
sentences = _split_sentences(raw_line, src_lang) or [raw_line]
|
||
parts = []
|
||
for s in sentences:
|
||
c = cache.get(src_key, tgt, s)
|
||
if c is not None:
|
||
parts.append(c)
|
||
continue
|
||
ok, ans = client.translate_once(s, tgt, src_lang)
|
||
if not ok:
|
||
all_ok = False
|
||
ans = f"【翻譯失敗|{tgt}】{s}"
|
||
else:
|
||
cache.put(src_key, tgt, s, ans)
|
||
parts.append(ans)
|
||
out_lines.append(" ".join(parts))
|
||
|
||
final = "\n".join(out_lines)
|
||
if all_ok:
|
||
cache.put(src_key, tgt, text, final)
|
||
return all_ok, final
|
||
|
||
# ---------- DOCX primitives ----------
|
||
def _p_text_with_breaks(p: Paragraph) -> str:
|
||
"""Read paragraph including soft line breaks and tabs."""
|
||
parts = []
|
||
for node in p._p.xpath(".//*[local-name()='t' or local-name()='br' or local-name()='tab']"):
|
||
tag = node.tag.split("}", 1)[-1]
|
||
if tag == "t":
|
||
parts.append(node.text or "")
|
||
elif tag == "br":
|
||
parts.append("\n")
|
||
else: # tab
|
||
parts.append(" ")
|
||
return "".join(parts).strip()
|
||
|
||
def _append_after(p: Paragraph, text_block: str, italic: bool=True, font_size_pt: int=INSERT_FONT_SIZE_PT) -> Paragraph:
|
||
"""Insert a new paragraph after p, return the new paragraph (for chain insert)."""
|
||
new_p = OxmlElement("w:p")
|
||
p._p.addnext(new_p)
|
||
np = Paragraph(new_p, p._parent)
|
||
lines = text_block.split("\n")
|
||
for i, line in enumerate(lines):
|
||
run = np.add_run(line)
|
||
if italic: run.italic = True
|
||
if font_size_pt: run.font.size = Pt(font_size_pt)
|
||
if i < len(lines) - 1:
|
||
run.add_break()
|
||
tag = np.add_run("\u200b")
|
||
if italic: tag.italic = True
|
||
if font_size_pt: tag.font.size = Pt(font_size_pt)
|
||
return np
|
||
|
||
def _is_our_insert_block(p: Paragraph) -> bool:
|
||
"""Return True iff paragraph contains our zero-width marker."""
|
||
return any("\u200b" in (r.text or "") for r in p.runs)
|
||
def _find_last_inserted_after(p: Paragraph, limit: int = 8) -> Optional[Paragraph]:
|
||
"""Return the last inserted paragraph after p (our style), else None."""
|
||
ptr = p._p.getnext()
|
||
last = None
|
||
steps = 0
|
||
while ptr is not None and steps < limit:
|
||
if ptr.tag.endswith("}p"):
|
||
q = Paragraph(ptr, p._parent)
|
||
if _is_our_insert_block(q):
|
||
last = q
|
||
steps += 1
|
||
ptr = ptr.getnext()
|
||
continue
|
||
break
|
||
return last
|
||
|
||
def _scan_our_tail_texts(p: Paragraph, limit: int = 8) -> List[str]:
|
||
"""Return texts of our inserted paragraphs right after p (up to limit)."""
|
||
ptr = p._p.getnext()
|
||
out = []
|
||
steps = 0
|
||
while ptr is not None and steps < limit:
|
||
if ptr.tag.endswith("}p"):
|
||
q = Paragraph(ptr, p._parent)
|
||
if _is_our_insert_block(q):
|
||
out.append(_p_text_with_breaks(q))
|
||
steps += 1
|
||
ptr = ptr.getnext()
|
||
continue
|
||
break
|
||
return out
|
||
|
||
# ---------- TextBox helpers ----------
|
||
def _txbx_iter_texts(doc: docx.Document):
|
||
"""
|
||
Yield (txbxContent_element, joined_source_text)
|
||
- Deeply collect all descendant <w:p> under txbxContent
|
||
- Skip our inserted translations: contains zero-width or (all italic and no CJK)
|
||
- Keep only lines that still have CJK
|
||
"""
|
||
def _p_text_flags(p_el):
|
||
parts=[]
|
||
for node in p_el.xpath(".//*[local-name()='t' or local-name()='br' or local-name()='tab']"):
|
||
tag=node.tag.split('}',1)[-1]
|
||
if tag=="t": parts.append(node.text or "")
|
||
elif tag=="br": parts.append("\n")
|
||
else: parts.append(" ")
|
||
text="".join(parts)
|
||
has_zero = ("\u200b" in text)
|
||
runs = p_el.xpath(".//*[local-name()='r']")
|
||
vis, ital = [], []
|
||
for r in runs:
|
||
rt = "".join([(t.text or "") for t in r.xpath(".//*[local-name()='t']")])
|
||
if (rt or "").strip():
|
||
vis.append(rt); ital.append(bool(r.xpath(".//*[local-name()='i']")))
|
||
all_italic = (len(vis)>0 and all(ital))
|
||
return text, has_zero, all_italic
|
||
|
||
for tx in doc._element.xpath(".//*[local-name()='txbxContent']"):
|
||
kept=[]
|
||
for p in tx.xpath(".//*[local-name()='p']"): # all descendant paragraphs
|
||
text, has_zero, all_italic = _p_text_flags(p)
|
||
if not (text or "").strip():
|
||
continue
|
||
if has_zero:
|
||
continue # our inserted
|
||
for line in text.split("\n"):
|
||
if line.strip():
|
||
kept.append(line.strip())
|
||
if kept:
|
||
joined = "\n".join(kept)
|
||
yield tx, joined
|
||
|
||
def _txbx_append_paragraph(tx, text_block: str, italic: bool=True, font_size_pt: int=INSERT_FONT_SIZE_PT):
|
||
p = OxmlElement("w:p")
|
||
r = OxmlElement("w:r")
|
||
rPr = OxmlElement("w:rPr")
|
||
if italic: rPr.append(OxmlElement("w:i"))
|
||
if font_size_pt:
|
||
sz = OxmlElement("w:sz"); sz.set(qn("w:val"), str(int(font_size_pt*2))); rPr.append(sz)
|
||
r.append(rPr)
|
||
lines = text_block.split("\n")
|
||
for i, line in enumerate(lines):
|
||
if i>0: r.append(OxmlElement("w:br"))
|
||
t = OxmlElement("w:t"); t.set(qn("xml:space"), "preserve"); t.text = line; r.append(t)
|
||
tag = OxmlElement("w:t"); tag.set(qn("xml:space"), "preserve"); tag.text="\u200b"; r.append(tag)
|
||
p.append(r); tx.append(p)
|
||
|
||
def _txbx_tail_equals(tx, translations: List[str]) -> bool:
|
||
paras = tx.xpath("./*[local-name()='p']")
|
||
if len(paras) < len(translations): return False
|
||
tail = paras[-len(translations):]
|
||
for q, expect in zip(tail, translations):
|
||
parts = []
|
||
for node in q.xpath(".//*[local-name()='t' or local-name()='br']"):
|
||
tag = node.tag.split("}", 1)[-1]
|
||
parts.append("\n" if tag=="br" else (node.text or ""))
|
||
if _normalize_text("".join(parts).strip()) != _normalize_text(expect):
|
||
return False
|
||
return True
|
||
|
||
# ---------- Two-phase model for DOCX ----------
|
||
class Segment:
|
||
def __init__(self, kind: str, ref, ctx: str, text: str):
|
||
self.kind = kind # 'para' | 'txbx'
|
||
self.ref = ref
|
||
self.ctx = ctx
|
||
self.text = text
|
||
|
||
def _get_paragraph_key(p: Paragraph) -> str:
|
||
"""Generate a stable unique key for paragraph deduplication."""
|
||
try:
|
||
# Use XML content hash + text content for stable deduplication
|
||
xml_content = p._p.xml if hasattr(p._p, 'xml') else str(p._p)
|
||
text_content = _p_text_with_breaks(p)
|
||
combined = f"{hash(xml_content)}_{len(text_content)}_{text_content[:50]}"
|
||
return combined
|
||
except Exception:
|
||
# Fallback to simple text-based key
|
||
text_content = _p_text_with_breaks(p)
|
||
return f"fallback_{hash(text_content)}_{len(text_content)}"
|
||
|
||
def _collect_docx_segments(doc: docx.Document) -> List[Segment]:
|
||
"""
|
||
Enhanced segment collector with improved stability.
|
||
Handles paragraphs, tables, textboxes, and SDT Content Controls.
|
||
"""
|
||
segs: List[Segment] = []
|
||
seen_par_keys = set()
|
||
|
||
# 我們需要從 docx.oxml.ns 導入命名空間前綴,以便 XPath 查詢
|
||
from docx.oxml.ns import nsdecls, qn
|
||
|
||
def _add_paragraph(p: Paragraph, ctx: str):
|
||
try:
|
||
p_key = _get_paragraph_key(p)
|
||
if p_key in seen_par_keys:
|
||
return
|
||
|
||
txt = _p_text_with_breaks(p)
|
||
if txt.strip() and not _is_our_insert_block(p):
|
||
segs.append(Segment("para", p, ctx, txt))
|
||
seen_par_keys.add(p_key)
|
||
except Exception as e:
|
||
# Log error but continue processing
|
||
print(f"[WARNING] 段落處理錯誤: {e}, 跳過此段落")
|
||
|
||
def _process_container_content(container, ctx: str):
|
||
"""
|
||
Recursively processes content within a container (body, cell, or SDT content).
|
||
Identifies and handles paragraphs, tables, and SDT elements.
|
||
"""
|
||
if container._element is None:
|
||
return
|
||
|
||
for child_element in container._element:
|
||
qname = child_element.tag
|
||
|
||
if qname.endswith('}p'): # Paragraph
|
||
p = Paragraph(child_element, container)
|
||
_add_paragraph(p, ctx)
|
||
|
||
elif qname.endswith('}tbl'): # Table
|
||
table = Table(child_element, container)
|
||
for r_idx, row in enumerate(table.rows, 1):
|
||
for c_idx, cell in enumerate(row.cells, 1):
|
||
cell_ctx = f"{ctx} > Tbl(r{r_idx},c{c_idx})"
|
||
_process_container_content(cell, cell_ctx)
|
||
|
||
elif qname.endswith('}sdt'): # <<<< NEW: Structured Document Tag (SDT)
|
||
sdt_ctx = f"{ctx} > SDT"
|
||
|
||
# 1. 提取 SDT 的元數據文本 (Placeholder, Dropdown items)
|
||
# 命名空間 'w' 是必須的
|
||
ns = {'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'}
|
||
|
||
# 提取 Placeholder text
|
||
placeholder_texts = []
|
||
for t in child_element.xpath('.//w:placeholder//w:t', namespaces=ns):
|
||
if t.text:
|
||
placeholder_texts.append(t.text)
|
||
if placeholder_texts:
|
||
full_placeholder = "".join(placeholder_texts).strip()
|
||
if full_placeholder:
|
||
# 將 placeholder 視為一個特殊的段落來處理
|
||
# 注意:我們無法直接將翻譯寫回 placeholder,所以這裡我們創建一個假的 Paragraph ref
|
||
# 翻譯結果將會被插入到 SDT 內容的末尾
|
||
segs.append(Segment("para", child_element, f"{sdt_ctx}-Placeholder", full_placeholder))
|
||
|
||
# 提取 Dropdown list items
|
||
list_items = []
|
||
for item in child_element.xpath('.//w:dropDownList/w:listItem', namespaces=ns):
|
||
display_text = item.get(qn('w:displayText'))
|
||
if display_text:
|
||
list_items.append(display_text)
|
||
if list_items:
|
||
# 將所有選項合併為一個文本塊進行翻譯
|
||
items_as_text = "\n".join(list_items)
|
||
segs.append(Segment("para", child_element, f"{sdt_ctx}-Dropdown", items_as_text))
|
||
|
||
# 2. 遞迴處理 SDT 的實際內容 (sdtContent)
|
||
sdt_content_element = child_element.find(qn('w:sdtContent'))
|
||
if sdt_content_element is not None:
|
||
# python-docx 沒有 SdtContent 的高階物件,但我們可以將 XML 元素和父級傳給遞迴函式
|
||
# 這裡我們模擬一個 container 物件,它只需要 ._element 和 ._parent 屬性
|
||
class SdtContentWrapper:
|
||
def __init__(self, element, parent):
|
||
self._element = element
|
||
self._parent = parent
|
||
|
||
sdt_content_wrapper = SdtContentWrapper(sdt_content_element, container)
|
||
_process_container_content(sdt_content_wrapper, sdt_ctx)
|
||
|
||
# --- Main execution starts here ---
|
||
|
||
# 1. Process the main document body
|
||
_process_container_content(doc._body, "Body")
|
||
|
||
# 2. Process textboxes
|
||
for tx, s in _txbx_iter_texts(doc):
|
||
if s.strip() and (_has_cjk(s) or should_translate(s, 'auto')):
|
||
segs.append(Segment("txbx", tx, "TextBox", s))
|
||
|
||
return segs
|
||
|
||
def _insert_docx_translations(doc: docx.Document, segs: List[Segment],
|
||
tmap: Dict[Tuple[str, str], str],
|
||
targets: List[str], log=lambda s: None) -> Tuple[int, int]:
|
||
"""
|
||
Insert translations into DOCX document segments.
|
||
|
||
CRITICAL: This function contains the fix for the major translation insertion bug.
|
||
The key fix is in the segment filtering logic - we now correctly check if any target
|
||
language has translation available using the proper key format (target_lang, text).
|
||
|
||
Args:
|
||
doc: The DOCX document object
|
||
segs: List of segments to translate
|
||
tmap: Translation map with keys as (target_language, source_text)
|
||
targets: List of target languages in order
|
||
log: Logging function
|
||
|
||
Returns:
|
||
Tuple of (successful_insertions, skipped_insertions)
|
||
|
||
Key Bug Fix:
|
||
OLD (INCORRECT): if (seg.kind, seg.text) not in tmap and (targets[0], seg.text) not in tmap
|
||
NEW (CORRECT): has_any_translation = any((tgt, seg.text) in tmap for tgt in targets)
|
||
"""
|
||
ok_cnt = skip_cnt = 0
|
||
|
||
# Helper function to add a formatted run to a paragraph
|
||
def _add_formatted_run(p: Paragraph, text: str, italic: bool, font_size_pt: int):
|
||
lines = text.split("\n")
|
||
for i, line in enumerate(lines):
|
||
run = p.add_run(line)
|
||
if italic: run.italic = True
|
||
if font_size_pt: run.font.size = Pt(font_size_pt)
|
||
if i < len(lines) - 1:
|
||
run.add_break()
|
||
# Add our zero-width space marker
|
||
tag_run = p.add_run("\u200b")
|
||
if italic: tag_run.italic = True
|
||
if font_size_pt: tag_run.font.size = Pt(font_size_pt)
|
||
|
||
for seg in segs:
|
||
# Check if any target language has translation for this segment
|
||
has_any_translation = any((tgt, seg.text) in tmap for tgt in targets)
|
||
if not has_any_translation:
|
||
log(f"[SKIP] 無翻譯結果: {seg.ctx} | {seg.text[:50]}...")
|
||
continue
|
||
|
||
# Get translations for all targets, with fallback for missing ones
|
||
translations = []
|
||
for tgt in targets:
|
||
if (tgt, seg.text) in tmap:
|
||
translations.append(tmap[(tgt, seg.text)])
|
||
else:
|
||
log(f"[WARNING] 缺少 {tgt} 翻譯: {seg.text[:30]}...")
|
||
translations.append(f"【翻譯查詢失敗|{tgt}】{seg.text[:50]}...")
|
||
|
||
log(f"[INSERT] 準備插入 {len(translations)} 個翻譯到 {seg.ctx}: {seg.text[:30]}...")
|
||
|
||
if seg.kind == "para":
|
||
# Check if this is an SDT segment (ref is an XML element, not a Paragraph)
|
||
if hasattr(seg.ref, 'tag') and seg.ref.tag.endswith('}sdt'):
|
||
# Handle SDT segments - insert translation into sdtContent
|
||
sdt_element = seg.ref
|
||
ns = {'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'}
|
||
sdt_content = sdt_element.find(qn('w:sdtContent'))
|
||
|
||
if sdt_content is not None:
|
||
# Check if translations already exist
|
||
existing_paras = sdt_content.xpath('.//w:p', namespaces=ns)
|
||
existing_texts = []
|
||
for ep in existing_paras:
|
||
p_obj = Paragraph(ep, None)
|
||
if _is_our_insert_block(p_obj):
|
||
existing_texts.append(_p_text_with_breaks(p_obj))
|
||
|
||
# Check if all translations already exist
|
||
if len(existing_texts) >= len(translations):
|
||
if all(_normalize_text(e) == _normalize_text(t) for e, t in zip(existing_texts[:len(translations)], translations)):
|
||
skip_cnt += 1
|
||
log(f"[SKIP] SDT 已存在翻譯: {seg.text[:30]}...")
|
||
continue
|
||
|
||
# Add translations to SDT content
|
||
for t in translations:
|
||
if not any(_normalize_text(t) == _normalize_text(e) for e in existing_texts):
|
||
# Create new paragraph in SDT content
|
||
new_p_element = OxmlElement("w:p")
|
||
sdt_content.append(new_p_element)
|
||
new_p = Paragraph(new_p_element, None)
|
||
_add_formatted_run(new_p, t, italic=True, font_size_pt=INSERT_FONT_SIZE_PT)
|
||
|
||
ok_cnt += 1
|
||
log(f"[SUCCESS] SDT 插入 {len(translations)} 個翻譯")
|
||
continue
|
||
|
||
p: Paragraph = seg.ref
|
||
|
||
# --- CONTEXT-AWARE INSERTION LOGIC ---
|
||
# Check if the paragraph's parent is a table cell
|
||
if isinstance(p._parent, _Cell):
|
||
cell = p._parent
|
||
|
||
try:
|
||
# Find the current paragraph's position in the cell
|
||
cell_paragraphs = list(cell.paragraphs)
|
||
p_index = -1
|
||
for idx, cell_p in enumerate(cell_paragraphs):
|
||
if cell_p._element == p._element:
|
||
p_index = idx
|
||
break
|
||
|
||
if p_index == -1:
|
||
log(f"[WARNING] 無法找到段落在單元格中的位置,使用原始方法")
|
||
# Fallback to original method
|
||
for block in translations:
|
||
new_p = cell.add_paragraph()
|
||
_add_formatted_run(new_p, block, italic=True, font_size_pt=INSERT_FONT_SIZE_PT)
|
||
ok_cnt += 1
|
||
continue
|
||
|
||
# Check if translations already exist right after this paragraph
|
||
existing_texts = []
|
||
check_limit = min(p_index + 1 + len(translations), len(cell_paragraphs))
|
||
for idx in range(p_index + 1, check_limit):
|
||
if _is_our_insert_block(cell_paragraphs[idx]):
|
||
existing_texts.append(_p_text_with_breaks(cell_paragraphs[idx]))
|
||
|
||
# Check if all translations already exist in order
|
||
if len(existing_texts) >= len(translations):
|
||
if all(_normalize_text(e) == _normalize_text(t) for e, t in zip(existing_texts[:len(translations)], translations)):
|
||
skip_cnt += 1
|
||
log(f"[SKIP] 表格單元格已存在翻譯: {seg.text[:30]}...")
|
||
continue
|
||
|
||
# Determine which translations need to be added
|
||
to_add = []
|
||
for t in translations:
|
||
if not any(_normalize_text(t) == _normalize_text(e) for e in existing_texts):
|
||
to_add.append(t)
|
||
|
||
if not to_add:
|
||
skip_cnt += 1
|
||
log(f"[SKIP] 表格單元格所有翻譯已存在: {seg.text[:30]}...")
|
||
continue
|
||
|
||
# Insert new paragraphs right after the current paragraph
|
||
insert_after = p
|
||
for block in to_add:
|
||
try:
|
||
# Create new paragraph and insert it after the current position
|
||
new_p_element = OxmlElement("w:p")
|
||
insert_after._element.addnext(new_p_element)
|
||
new_p = Paragraph(new_p_element, cell)
|
||
_add_formatted_run(new_p, block, italic=True, font_size_pt=INSERT_FONT_SIZE_PT)
|
||
insert_after = new_p # Update position for next insertion
|
||
except Exception as e:
|
||
log(f"[ERROR] 表格插入失敗: {e}, 嘗試fallback方法")
|
||
# Fallback: add at the end of cell
|
||
try:
|
||
new_p = cell.add_paragraph()
|
||
_add_formatted_run(new_p, block, italic=True, font_size_pt=INSERT_FONT_SIZE_PT)
|
||
log(f"[SUCCESS] Fallback插入成功")
|
||
except Exception as e2:
|
||
log(f"[FATAL] Fallback也失敗: {e2}")
|
||
continue
|
||
ok_cnt += 1
|
||
log(f"[SUCCESS] 表格單元格插入 {len(to_add)} 個翻譯(緊接原文後)")
|
||
|
||
except Exception as e:
|
||
log(f"[ERROR] 表格處理全面失敗: {e}, 跳過此段落")
|
||
continue
|
||
|
||
else: # The original logic for top-level paragraphs
|
||
try:
|
||
existing_texts = _scan_our_tail_texts(p, limit=max(len(translations), 4))
|
||
if existing_texts and len(existing_texts) >= len(translations):
|
||
if all(_normalize_text(e) == _normalize_text(t) for e, t in zip(existing_texts[:len(translations)], translations)):
|
||
skip_cnt += 1
|
||
log(f"[SKIP] 段落已存在翻譯: {seg.text[:30]}...")
|
||
continue
|
||
|
||
to_add = []
|
||
for t in translations:
|
||
if not any(_normalize_text(t) == _normalize_text(e) for e in existing_texts):
|
||
to_add.append(t)
|
||
|
||
if not to_add:
|
||
skip_cnt += 1
|
||
log(f"[SKIP] 段落所有翻譯已存在: {seg.text[:30]}...")
|
||
continue
|
||
|
||
last = _find_last_inserted_after(p, limit=max(len(translations), 4))
|
||
anchor = last if last else p
|
||
|
||
for block in to_add:
|
||
try:
|
||
anchor = _append_after(anchor, block, italic=True, font_size_pt=INSERT_FONT_SIZE_PT)
|
||
except Exception as e:
|
||
log(f"[ERROR] 段落插入失敗: {e}, 嘗試簡化插入")
|
||
try:
|
||
# Fallback: simple append
|
||
new_p = p._parent.add_paragraph(block)
|
||
new_p.runs[0].italic = True if new_p.runs else None
|
||
log(f"[SUCCESS] 簡化插入成功")
|
||
except Exception as e2:
|
||
log(f"[FATAL] 簡化插入也失敗: {e2}")
|
||
continue
|
||
ok_cnt += 1
|
||
log(f"[SUCCESS] 段落插入 {len(to_add)} 個翻譯")
|
||
|
||
except Exception as e:
|
||
log(f"[ERROR] 段落處理全面失敗: {e}, 跳過此段落")
|
||
continue
|
||
|
||
elif seg.kind == "txbx":
|
||
tx = seg.ref
|
||
if _txbx_tail_equals(tx, translations):
|
||
skip_cnt += 1; continue
|
||
|
||
paras = tx.xpath("./*[local-name()='p']")
|
||
tail_texts = []
|
||
scan = paras[-max(len(translations), 4):] if len(paras) else []
|
||
for q in scan:
|
||
has_zero = any(((t.text or "").find("\u200b") >= 0) for t in q.xpath(".//*[local-name()='t']"))
|
||
if has_zero:
|
||
qtxt = "".join([(node.text or "") for node in q.xpath(".//*[local-name()='t' or local-name()='br']")]).strip()
|
||
tail_texts.append(qtxt)
|
||
|
||
to_add = []
|
||
for t in translations:
|
||
if not any(_normalize_text(t) == _normalize_text(e) for e in tail_texts):
|
||
to_add.append(t)
|
||
|
||
if not to_add:
|
||
skip_cnt += 1; continue
|
||
|
||
for block in to_add:
|
||
_txbx_append_paragraph(tx, block, italic=True, font_size_pt=INSERT_FONT_SIZE_PT)
|
||
ok_cnt += 1
|
||
|
||
log(f"[DOCX] 插入完成:成功 {ok_cnt} 段、略過 {skip_cnt} 段(已存在/只補缺)")
|
||
return ok_cnt, skip_cnt
|
||
|
||
def translate_docx(in_path: str, out_path: str, targets: List[str], src_lang: Optional[str],
|
||
cache: TranslationCache, client: DifyClient, include_headers_shapes_via_com: bool,
|
||
log=lambda s: None):
|
||
from shutil import copyfile
|
||
copyfile(in_path, out_path)
|
||
doc = docx.Document(out_path)
|
||
|
||
# Health check
|
||
ok, msg = client.health_check()
|
||
log(f"[API Health] {msg}")
|
||
if not ok:
|
||
raise ApiError("API 無法連線或未授權。請檢查 base_url / api。")
|
||
|
||
# Phase 1: collect
|
||
segs = _collect_docx_segments(doc)
|
||
log(f"[DOCX] 待翻譯段/方塊總數:{len(segs)}")
|
||
|
||
# Phase 2: translate unique
|
||
uniq_texts = [t for t in sorted(set(s.text for s in segs)) if should_translate(t, (src_lang or 'auto'))]
|
||
tmap: Dict[Tuple[str, str], str] = {}
|
||
total = len(uniq_texts) * len(targets)
|
||
done = 0; fail_cnt = 0
|
||
for txt in uniq_texts:
|
||
for tgt in targets:
|
||
done += 1
|
||
preview = (txt.replace("\n", " ")[:40] + "..." if len(txt) > 40 else txt)
|
||
log(f"[TR] {done}/{total} {tgt} len={len(txt)} 「{preview}」")
|
||
if SENTENCE_MODE:
|
||
ok1, res = translate_block_sentencewise(txt, tgt, src_lang, cache, client)
|
||
else:
|
||
ok1, res = client.translate_once(txt, tgt, src_lang)
|
||
if not ok1: res = f"【翻譯失敗|{tgt}】{txt}"
|
||
if not ok1: fail_cnt += 1
|
||
src_key = (src_lang or "auto").lower()
|
||
if SENTENCE_MODE and ok1:
|
||
cache.put(src_key, tgt, txt, res)
|
||
tmap[(tgt, txt)] = res
|
||
if fail_cnt:
|
||
log(f"[DOCX] 翻譯失敗 {fail_cnt} 筆(以占位文寫回)")
|
||
|
||
# Phase 3: insert
|
||
_insert_docx_translations(doc, segs, tmap, targets, log=log)
|
||
|
||
# Save docx
|
||
doc.save(out_path)
|
||
log(f"[DOCX] 輸出:{os.path.basename(out_path)}")
|
||
|
||
# Only header/footer shapes via COM if requested
|
||
if include_headers_shapes_via_com and _WIN32COM_AVAILABLE:
|
||
postprocess_docx_shapes_with_word(out_path, targets, src_lang, cache, client,
|
||
include_headers=True, log=log)
|
||
|
||
# ---------- Windows COM helpers (optional, headers/footers only) ----------
|
||
def _com_iter(coll):
|
||
try: count = coll.Count
|
||
except Exception: return
|
||
for i in range(1, count+1): yield coll.Item(i)
|
||
|
||
def _word_convert(input_path: str, output_path: str, target_format: int):
|
||
if not _WIN32COM_AVAILABLE: raise RuntimeError("Word COM not available")
|
||
pythoncom.CoInitialize()
|
||
try:
|
||
word = win32.Dispatch("Word.Application"); word.Visible = False
|
||
doc = word.Documents.Open(os.path.abspath(input_path))
|
||
doc.SaveAs2(os.path.abspath(output_path), FileFormat=target_format)
|
||
doc.Close(False)
|
||
finally:
|
||
word.Quit(); pythoncom.CoUninitialize()
|
||
|
||
def _excel_convert(input_path: str, output_path: str):
|
||
if not _WIN32COM_AVAILABLE: raise RuntimeError("Excel COM not available")
|
||
pythoncom.CoInitialize()
|
||
try:
|
||
excel = win32.Dispatch("Excel.Application"); excel.Visible = False
|
||
try: excel.DisplayAlerts = False
|
||
except Exception: pass
|
||
wb = excel.Workbooks.Open(os.path.abspath(input_path))
|
||
wb.SaveAs(os.path.abspath(output_path), FileFormat=51)
|
||
wb.Close(SaveChanges=False)
|
||
finally:
|
||
excel.Quit(); pythoncom.CoUninitialize()
|
||
|
||
def postprocess_docx_shapes_with_word(docx_path: str, targets: List[str], src_lang: Optional[str],
|
||
cache: TranslationCache, client: DifyClient,
|
||
include_headers: bool=False, log=lambda s: None):
|
||
# Only when explicitly requested, and headers/footers only
|
||
if not _WIN32COM_AVAILABLE or not include_headers:
|
||
return
|
||
pythoncom.CoInitialize()
|
||
try:
|
||
word = win32.Dispatch("Word.Application"); word.Visible = False
|
||
try: word.ScreenUpdating = False
|
||
except Exception: pass
|
||
try: word.DisplayAlerts = 0
|
||
except Exception: pass
|
||
doc = word.Documents.Open(os.path.abspath(docx_path))
|
||
|
||
def _proc_shapes(shapes):
|
||
for shp in _com_iter(shapes):
|
||
try:
|
||
tf = getattr(shp, "TextFrame", None)
|
||
if tf and getattr(tf, "HasText", False):
|
||
src = tf.TextRange.Text
|
||
if not src or not src.strip(): continue
|
||
if len(src) > MAX_SHAPE_CHARS:
|
||
log(f"[Skip shape] too long ({len(src)} chars)"); continue
|
||
blocks=[]
|
||
for tgt in targets:
|
||
ok, tr = translate_block_sentencewise(src, tgt, src_lang, cache, client)
|
||
if not ok: tr = f"【翻譯失敗|{tgt}】{src}"
|
||
blocks.append(tr)
|
||
suffix = "\r" + "\r".join(blocks)
|
||
full = tf.TextRange.Text or ""
|
||
if _normalize_text(full[-len(suffix):]) == _normalize_text(suffix):
|
||
continue
|
||
tf.TextRange.InsertAfter(suffix)
|
||
try:
|
||
dup = tf.TextRange.Duplicate
|
||
start = len(full) + 1; end = dup.Characters.Count
|
||
dup.SetRange(start, end); dup.Font.Italic = True
|
||
except Exception: pass
|
||
except Exception as e:
|
||
log(f"[COM shape error] {e}")
|
||
|
||
# headers/footers only
|
||
for sec in _com_iter(doc.Sections):
|
||
try:
|
||
_proc_shapes(sec.Headers(c.wdHeaderFooterPrimary).Shapes)
|
||
_proc_shapes(sec.Headers(c.wdHeaderFooterFirstPage).Shapes)
|
||
_proc_shapes(sec.Headers(c.wdHeaderFooterEvenPages).Shapes)
|
||
_proc_shapes(sec.Footers(c.wdHeaderFooterPrimary).Shapes)
|
||
_proc_shapes(sec.Footers(c.wdHeaderFooterFirstPage).Shapes)
|
||
_proc_shapes(sec.Footers(c.wdHeaderFooterEvenPages).Shapes)
|
||
except Exception: pass
|
||
|
||
doc.Save(); doc.Close(False)
|
||
finally:
|
||
try: word.ScreenUpdating = True
|
||
except Exception: pass
|
||
word.Quit(); pythoncom.CoUninitialize()
|
||
|
||
# ---------- PPTX ----------
|
||
def _ppt_text_of_tf(tf) -> str:
|
||
return "\n".join([p.text for p in tf.paragraphs])
|
||
|
||
def _ppt_tail_equals(tf, translations: List[str]) -> bool:
|
||
if len(tf.paragraphs) < len(translations): return False
|
||
tail = tf.paragraphs[-len(translations):]
|
||
for para, expect in zip(tail, translations):
|
||
if _normalize_text(para.text) != _normalize_text(expect): return False
|
||
if any((r.font.italic is not True) and (r.text or "").strip() for r in para.runs): return False
|
||
return True
|
||
|
||
def _ppt_append(tf, text_block: str):
|
||
p = tf.add_paragraph()
|
||
p.text = text_block
|
||
for r in p.runs:
|
||
r.font.italic = True
|
||
r.font.size = PPTPt(12)
|
||
|
||
def translate_pptx(in_path: str, out_path: str, targets: List[str], src_lang: Optional[str],
|
||
cache: TranslationCache, client: DifyClient, log=lambda s: None):
|
||
prs = pptx.Presentation(in_path)
|
||
segs=[]
|
||
for slide in prs.slides:
|
||
for sh in slide.shapes:
|
||
if not getattr(sh, "has_text_frame", False): continue
|
||
tf = sh.text_frame
|
||
txt = _ppt_text_of_tf(tf)
|
||
if txt.strip():
|
||
segs.append((tf, txt))
|
||
log(f"[PPTX] 待翻譯區塊:{len(segs)}")
|
||
uniq = [s for s in sorted(set(s for _, s in segs)) if should_translate(s, (src_lang or 'auto'))]
|
||
tmap: Dict[Tuple[str, str], str] = {}
|
||
for s in uniq:
|
||
for tgt in targets:
|
||
ok, res = translate_block_sentencewise(s, tgt, src_lang, cache, client)
|
||
if not ok: res = f"【翻譯失敗|{tgt}】{s}"
|
||
tmap[(tgt, s)] = res
|
||
ok_cnt=skip_cnt=0
|
||
for tf, s in segs:
|
||
trs = [tmap[(tgt, s)] for tgt in targets]
|
||
if _ppt_tail_equals(tf, trs):
|
||
skip_cnt += 1; continue
|
||
for block in trs: _ppt_append(tf, block)
|
||
ok_cnt += 1
|
||
prs.save(out_path)
|
||
log(f"[PPTX] 插入完成:成功 {ok_cnt}、略過 {skip_cnt} → {os.path.basename(out_path)}")
|
||
|
||
# ---------- XLSX/XLS ----------
|
||
def _get_display_text_for_translation(ws, ws_vals, r: int, c: int) -> Optional[str]:
|
||
val = ws.cell(row=r, column=c).value
|
||
if isinstance(val, str) and val.startswith("="):
|
||
if ws_vals is not None:
|
||
shown = ws_vals.cell(row=r, column=c).value
|
||
return shown if isinstance(shown, str) and shown.strip() else None
|
||
return None
|
||
if isinstance(val, str) and val.strip():
|
||
return val
|
||
if ws_vals is not None:
|
||
shown = ws_vals.cell(row=r, column=c).value
|
||
if isinstance(shown, str) and shown.strip():
|
||
return shown
|
||
return None
|
||
|
||
def translate_xlsx_xls(in_path: str, out_path: str, targets: List[str], src_lang: Optional[str],
|
||
cache: TranslationCache, client: DifyClient,
|
||
excel_formula_mode: str = EXCEL_FORMULA_MODE, log=lambda s: None):
|
||
ext = Path(in_path).suffix.lower()
|
||
out_xlsx = Path(out_path).with_suffix(".xlsx")
|
||
if ext == ".xls" and _WIN32COM_AVAILABLE:
|
||
tmp = str(Path(out_path).with_suffix("")) + "__from_xls.xlsx"
|
||
try:
|
||
log("[XLS] 使用 Excel COM 轉檔為 .xlsx …")
|
||
_excel_convert(in_path, tmp)
|
||
translate_xlsx_xls(tmp, out_path, targets, src_lang, cache, client,
|
||
excel_formula_mode=excel_formula_mode, log=log)
|
||
finally:
|
||
try: os.remove(tmp)
|
||
except Exception: pass
|
||
return
|
||
if ext not in (".xlsx", ".xls"):
|
||
raise RuntimeError("Unsupported Excel type")
|
||
wb = openpyxl.load_workbook(in_path, data_only=False)
|
||
try:
|
||
wb_vals = openpyxl.load_workbook(in_path, data_only=True)
|
||
except Exception:
|
||
wb_vals = None
|
||
segs=[]
|
||
for ws in wb.worksheets:
|
||
ws_vals = wb_vals[ws.title] if wb_vals and ws.title in wb_vals.sheetnames else None
|
||
max_row, max_col = ws.max_row, ws.max_column
|
||
for r in range(1, max_row+1):
|
||
for c in range(1, max_col+1):
|
||
src_text = _get_display_text_for_translation(ws, ws_vals, r, c)
|
||
if not src_text: continue
|
||
if not should_translate(src_text, (src_lang or 'auto')): continue
|
||
val = ws.cell(row=r, column=c).value
|
||
is_formula = isinstance(val, str) and val.startswith("=")
|
||
segs.append((ws.title, r, c, src_text, is_formula))
|
||
log(f"[Excel] 待翻譯儲存格:{len(segs)}")
|
||
uniq = sorted(set(s[3] for s in segs))
|
||
tmap: Dict[Tuple[str, str], str] = {}
|
||
for text in uniq:
|
||
for tgt in targets:
|
||
ok, res = translate_block_sentencewise(text, tgt, src_lang, cache, client)
|
||
if not ok: res = f"【翻譯失敗|{tgt}】{text}"
|
||
tmap[(tgt, text)] = res
|
||
for sheet_name, r, c, src_text, is_formula in segs:
|
||
ws = wb[sheet_name]
|
||
trs = [tmap[(tgt, src_text)] for tgt in targets]
|
||
if is_formula:
|
||
if excel_formula_mode == "skip":
|
||
continue
|
||
elif excel_formula_mode == "comment":
|
||
txt_comment = "\n".join([f"[{t}] {res}" for t, res in zip(targets, trs)])
|
||
cell = ws.cell(row=r, column=c)
|
||
exist = cell.comment
|
||
if not exist or _normalize_text(exist.text) != _normalize_text(txt_comment):
|
||
cell.comment = Comment(txt_comment, "translator")
|
||
continue
|
||
else:
|
||
continue
|
||
combined = "\n".join([src_text] + trs)
|
||
cell = ws.cell(row=r, column=c)
|
||
if isinstance(cell.value, str) and _normalize_text(cell.value) == _normalize_text(combined):
|
||
continue
|
||
cell.value = combined
|
||
try:
|
||
if cell.alignment:
|
||
cell.alignment = Alignment(horizontal=cell.alignment.horizontal,
|
||
vertical=cell.alignment.vertical,
|
||
wrap_text=True)
|
||
else:
|
||
cell.alignment = Alignment(wrap_text=True)
|
||
except Exception:
|
||
cell.alignment = Alignment(wrap_text=True)
|
||
wb.save(out_xlsx)
|
||
log(f"[Excel] 輸出:{out_xlsx.name}")
|
||
|
||
# ---------- PDF ----------
|
||
def translate_pdf(in_path: str, out_path: str, targets: List[str], src_lang: Optional[str],
|
||
cache: TranslationCache, client: DifyClient, log=lambda s: None):
|
||
temp_docx = str(Path(out_path).with_suffix("")) + "__from_pdf.docx"
|
||
if _WIN32COM_AVAILABLE:
|
||
try:
|
||
_word_convert(in_path, temp_docx, 16)
|
||
translate_docx(temp_docx, out_path, targets, src_lang, cache, client,
|
||
include_headers_shapes_via_com=False, log=log)
|
||
try: os.remove(temp_docx)
|
||
except Exception: pass
|
||
return
|
||
except Exception as e:
|
||
log(f"[PDF] Word import failed, fallback to text extract: {e}")
|
||
doc = docx.Document()
|
||
try:
|
||
reader = PdfReader(in_path)
|
||
for i, page in enumerate(reader.pages, start=1):
|
||
doc.add_heading(f"— Page {i} —", level=1)
|
||
text = page.extract_text() or ""
|
||
if text.strip():
|
||
doc.add_paragraph(text)
|
||
for tgt in targets:
|
||
ok, tr = translate_block_sentencewise(text, tgt, src_lang, cache, client)
|
||
if not ok: tr = f"【翻譯失敗|{tgt}】{text}"
|
||
p = doc.add_paragraph("")
|
||
lines = tr.split("\n")
|
||
for j, line in enumerate(lines):
|
||
r = p.add_run(line); r.italic = True; r.font.size = Pt(INSERT_FONT_SIZE_PT)
|
||
if j < len(lines)-1: r.add_break()
|
||
tag = p.add_run("\u200b"); tag.italic = True; tag.font.size = Pt(INSERT_FONT_SIZE_PT)
|
||
else:
|
||
doc.add_paragraph("[Empty or image-only page]")
|
||
except Exception as e:
|
||
doc.add_paragraph(f"[PDF extract error] {e}")
|
||
doc.save(out_path)
|
||
log(f"[PDF] 輸出(docx 報告):{os.path.basename(out_path)}")
|
||
|
||
# ---------- Orchestrator ----------
|
||
def process_path(input_path: Path, output_dir: Path, targets: List[str], src_lang: Optional[str],
|
||
base_url: str, api_key: str, cache: TranslationCache, recurse: bool,
|
||
include_headers_shapes_via_com: bool, backend: str = 'Ollama', ollama_model: str = 'gpt-oss:latest', log=lambda s: None):
|
||
if not input_path.exists():
|
||
raise FileNotFoundError(f"Input not found: {input_path}")
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
client = (DifyClient(base_url, api_key, log=log) if backend.lower()=="dify" else OllamaClient(model=ollama_model, log=log))
|
||
files: List[Path]
|
||
if input_path.is_dir():
|
||
candidates = input_path.rglob("*") if recurse else input_path.glob("*")
|
||
files = [p for p in candidates if p.is_file() and p.suffix.lower() in SUPPORTED]
|
||
log(f"[Folder] 掃描到 {len(files)} 個支援檔案")
|
||
else:
|
||
files = [input_path] if input_path.suffix.lower() in SUPPORTED else []
|
||
if not files: log("Selected file type is not supported."); return
|
||
for src in files:
|
||
ext = src.suffix.lower()
|
||
stem = src.stem
|
||
out_name = f"{stem}_translated{ext if ext in ('.docx','.pptx','.xlsx') else ('.docx' if ext in ('.doc','.pdf') else ext)}"
|
||
out_path = output_dir / out_name
|
||
log("="*24); log(f"處理:{src.name}")
|
||
try:
|
||
if ext == ".docx":
|
||
translate_docx(str(src), str(out_path), targets, src_lang, cache, client,
|
||
include_headers_shapes_via_com=include_headers_shapes_via_com, log=log)
|
||
elif ext == ".doc":
|
||
tmp_docx = str(output_dir / f"{stem}__tmp.docx")
|
||
if _WIN32COM_AVAILABLE:
|
||
_word_convert(str(src), tmp_docx, 16)
|
||
translate_docx(tmp_docx, str(out_path), targets, src_lang, cache, client,
|
||
include_headers_shapes_via_com=include_headers_shapes_via_com, log=log)
|
||
try: os.remove(tmp_docx)
|
||
except Exception: pass
|
||
else:
|
||
log("[DOC] 無法使用 Word COM,請先轉為 .docx")
|
||
elif ext == ".pptx":
|
||
translate_pptx(str(src), str(out_path), targets, src_lang, cache, client, log=log)
|
||
elif ext in (".xlsx", ".xls"):
|
||
translate_xlsx_xls(str(src), str(out_path), targets, src_lang, cache, client, log=log)
|
||
elif ext == ".pdf":
|
||
translate_pdf(str(src), str(out_path), targets, src_lang, cache, client, log=log)
|
||
log(f"完成:{src.name} → {out_path.name}")
|
||
except ApiError as e:
|
||
log(f"[FATAL] {src.name}: {e}")
|
||
except Exception as e:
|
||
log(f"[FATAL] {src.name}: {e}")
|
||
|
||
# ---------- GUI ----------
|
||
import tkinter as tk
|
||
from tkinter import ttk, filedialog, messagebox
|
||
|
||
COMMON_LANGS = [
|
||
"English","Vietnamese","Traditional Chinese","Simplified Chinese","Japanese","Korean",
|
||
"Thai","Indonesian","French","German","Spanish","Portuguese","Italian","Russian","Arabic","Hindi"
|
||
]
|
||
|
||
class TranslatorGUI(tk.Tk):
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.title(APP_TITLE); self.geometry("1040x900")
|
||
self.stop_flag = threading.Event()
|
||
self.worker_thread: Optional[threading.Thread] = None
|
||
self.log_queue: "queue.Queue[str]" = queue.Queue()
|
||
self.cache: Optional[TranslationCache] = None
|
||
self._build_ui()
|
||
load_api_config_from_file()
|
||
if DIFY_API_BASE_URL: self.base_url_var.set(DIFY_API_BASE_URL)
|
||
if DIFY_API_KEY: self.api_key_var.set(DIFY_API_KEY)
|
||
try:
|
||
self._refresh_ollama_models()
|
||
except Exception:
|
||
pass
|
||
self.after(100, self._drain_log_queue)
|
||
|
||
def _build_ui(self):
|
||
pad = {"padx":8,"pady":4}
|
||
|
||
# Paths
|
||
frm_path = ttk.LabelFrame(self, text="Paths"); frm_path.pack(fill="x", **pad)
|
||
self.input_mode_var = tk.StringVar(value="file")
|
||
self.in_path_var = tk.StringVar()
|
||
self.out_dir_var = tk.StringVar(value=DEFAULT_OUTPUT_DIR)
|
||
self.recurse_var = tk.BooleanVar(value=True)
|
||
ttk.Radiobutton(frm_path, text="Single File", value="file", variable=self.input_mode_var).grid(row=0, column=0, sticky="w")
|
||
ttk.Radiobutton(frm_path, text="Folder", value="folder", variable=self.input_mode_var).grid(row=0, column=1, sticky="w")
|
||
ttk.Label(frm_path, text="Input path:").grid(row=1, column=0, sticky="w")
|
||
ttk.Entry(frm_path, textvariable=self.in_path_var, width=74).grid(row=1, column=1, sticky="we")
|
||
ttk.Button(frm_path, text="Browse...", command=self._browse_input).grid(row=1, column=2, sticky="e")
|
||
ttk.Checkbutton(frm_path, text="Recurse subfolders (folder mode)", variable=self.recurse_var).grid(row=2, column=1, sticky="w")
|
||
ttk.Label(frm_path, text="Output folder:").grid(row=3, column=0, sticky="w")
|
||
ttk.Entry(frm_path, textvariable=self.out_dir_var, width=74).grid(row=3, column=1, sticky="we")
|
||
ttk.Button(frm_path, text="Browse...", command=self._browse_output).grid(row=3, column=2, sticky="e")
|
||
frm_path.columnconfigure(1, weight=1)
|
||
|
||
# API
|
||
frm_api = ttk.LabelFrame(self, text="Backend & API"); frm_api.pack(fill="x", **pad)
|
||
self.backend_var = tk.StringVar(value="Ollama")
|
||
ttk.Label(frm_api, text="Backend:").grid(row=0, column=0, sticky="w")
|
||
ttk.Combobox(frm_api, textvariable=self.backend_var, values=["Ollama","Dify"], width=18, state="readonly").grid(row=0, column=1, sticky="w")
|
||
# Dify settings
|
||
self.base_url_var = tk.StringVar(); self.api_key_var = tk.StringVar()
|
||
ttk.Label(frm_api, text="Dify Base URL:").grid(row=1, column=0, sticky="w")
|
||
ttk.Entry(frm_api, textvariable=self.base_url_var, width=60).grid(row=1, column=1, sticky="we")
|
||
ttk.Label(frm_api, text="Dify API Key:").grid(row=2, column=0, sticky="w")
|
||
ttk.Entry(frm_api, textvariable=self.api_key_var, width=60, show="•").grid(row=2, column=1, sticky="we")
|
||
# Ollama model
|
||
self.ollama_model_var = tk.StringVar(value="gpt-oss:latest")
|
||
ttk.Label(frm_api, text="Ollama Model:").grid(row=3, column=0, sticky="w")
|
||
self.cmb_ollama = ttk.Combobox(frm_api, textvariable=self.ollama_model_var, values=[], width=40)
|
||
self.cmb_ollama.grid(row=3, column=1, sticky="w")
|
||
ttk.Button(frm_api, text="Refresh Models", command=self._refresh_ollama_models).grid(row=3, column=2, sticky="w")
|
||
frm_api.columnconfigure(1, weight=1)
|
||
|
||
# Languages & Order
|
||
frm_lang = ttk.LabelFrame(self, text="Languages & Order"); frm_lang.pack(fill="x", **pad)
|
||
self.src_lang_var = tk.StringVar(value="Auto")
|
||
ttk.Label(frm_lang, text="Source:").grid(row=0, column=0, sticky="w")
|
||
ttk.Combobox(frm_lang, textvariable=self.src_lang_var,
|
||
values=["Auto"] + COMMON_LANGS, width=24, state="readonly").grid(row=0, column=1, sticky="w")
|
||
ttk.Label(frm_lang, text="Targets (select & reorder):").grid(row=1, column=0, sticky="nw")
|
||
self.lst_targets = tk.Listbox(frm_lang, selectmode="extended", height=10, exportselection=False)
|
||
for lang in COMMON_LANGS:
|
||
self.lst_targets.insert(tk.END, lang)
|
||
# 預設英、越
|
||
self.lst_targets.selection_set(0)
|
||
self.lst_targets.selection_set(1)
|
||
self.lst_targets.grid(row=1, column=1, sticky="we")
|
||
frm_lang.columnconfigure(1, weight=1)
|
||
|
||
side = ttk.Frame(frm_lang); side.grid(row=1, column=2, sticky="nsw", padx=8)
|
||
ttk.Button(side, text="▲ Move Up", command=self._move_up).pack(anchor="w", pady=(0,4))
|
||
ttk.Button(side, text="▼ Move Down", command=self._move_down).pack(anchor="w", pady=(0,8))
|
||
ttk.Label(side, text="Hint: 順序 = 輸出順序").pack(anchor="w")
|
||
|
||
self.sel_summary_var = tk.StringVar(value="Selected: English, Vietnamese")
|
||
ttk.Label(frm_lang, textvariable=self.sel_summary_var).grid(row=2, column=1, sticky="w", pady=(6,0))
|
||
|
||
# Options
|
||
frm_opt = ttk.LabelFrame(self, text="Options"); frm_opt.pack(fill="x", **pad)
|
||
self.include_headers_var = tk.BooleanVar(value=False)
|
||
ttk.Checkbutton(frm_opt, text="Include headers/footers Shapes via Word COM (Windows only)",
|
||
variable=self.include_headers_var).grid(row=0, column=0, sticky="w")
|
||
|
||
# Controls
|
||
frm_ctl = ttk.Frame(self); frm_ctl.pack(fill="x", **pad)
|
||
ttk.Button(frm_ctl, text="Start", command=self._on_start).pack(side="left", padx=4)
|
||
ttk.Button(frm_ctl, text="Resume", command=self._on_resume).pack(side="left", padx=4)
|
||
ttk.Button(frm_ctl, text="Stop", command=self._on_stop).pack(side="left", padx=4)
|
||
ttk.Button(frm_ctl, text="Clear Log", command=self._clear_log).pack(side="left", padx=4)
|
||
|
||
# Log
|
||
frm_log = ttk.LabelFrame(self, text="Log"); frm_log.pack(fill="both", expand=True, **pad)
|
||
self.txt_log = tk.Text(frm_log, wrap="word", height=22); self.txt_log.pack(fill="both", expand=True)
|
||
|
||
self.lst_targets.bind("<<ListboxSelect>>", lambda e: self._update_target_summary())
|
||
|
||
# --- UI helpers ---
|
||
def _browse_input(self):
|
||
if self.input_mode_var.get() == "file":
|
||
p = filedialog.askopenfilename(
|
||
title="Choose a file",
|
||
filetypes=[("Supported","*.docx *.doc *.pptx *.xlsx *.xls *.pdf"), ("All files","*.*")]
|
||
)
|
||
else:
|
||
p = filedialog.askdirectory(title="Choose a folder")
|
||
if p: self.in_path_var.set(p)
|
||
|
||
def _browse_output(self):
|
||
p = filedialog.askdirectory(title="Choose output folder")
|
||
if p: self.out_dir_var.set(p)
|
||
|
||
def _log(self, s: str):
|
||
self.log_queue.put(s)
|
||
|
||
def _drain_log_queue(self):
|
||
try:
|
||
while True:
|
||
s = self.log_queue.get_nowait()
|
||
self.txt_log.insert(tk.END, s + "\n"); self.txt_log.see(tk.END)
|
||
except queue.Empty:
|
||
pass
|
||
self.after(120, self._drain_log_queue)
|
||
|
||
def _collect_targets(self) -> List[str]:
|
||
sel = set(self.lst_targets.curselection())
|
||
return [self.lst_targets.get(i) for i in range(self.lst_targets.size()) if i in sel]
|
||
|
||
def _update_target_summary(self):
|
||
tgts = self._collect_targets()
|
||
self.sel_summary_var.set("Selected: " + (", ".join(tgts) if tgts else "(none)"))
|
||
|
||
def _move_up(self):
|
||
sel = list(self.lst_targets.curselection())
|
||
if not sel: return
|
||
for idx in sel:
|
||
if idx == 0: continue
|
||
text = self.lst_targets.get(idx)
|
||
self.lst_targets.delete(idx)
|
||
self.lst_targets.insert(idx-1, text)
|
||
self.lst_targets.selection_set(idx-1)
|
||
self._update_target_summary()
|
||
|
||
def _move_down(self):
|
||
sel = list(self.lst_targets.curselection())
|
||
if not sel: return
|
||
for idx in reversed(sel):
|
||
if idx == self.lst_targets.size()-1: continue
|
||
text = self.lst_targets.get(idx)
|
||
self.lst_targets.delete(idx)
|
||
self.lst_targets.insert(idx+1, text)
|
||
self.lst_targets.selection_set(idx+1)
|
||
self._update_target_summary()
|
||
|
||
def _refresh_ollama_models(self):
|
||
try:
|
||
models = list_ollama_models()
|
||
if models:
|
||
self.cmb_ollama['values'] = models
|
||
if self.ollama_model_var.get() not in models:
|
||
self.ollama_model_var.set(models[0])
|
||
self._log(f"[Ollama] Models: {', '.join(models)}")
|
||
else:
|
||
self._log("[Ollama] No models found.")
|
||
except Exception as e:
|
||
self._log(f"[Ollama] List models failed: {e}")
|
||
|
||
def _start_worker(self, resume: bool=False):
|
||
base = self.base_url_var.get().strip().rstrip("/")
|
||
key = self.api_key_var.get().strip()
|
||
backend = self.backend_var.get().strip()
|
||
if backend == 'Dify' and (not base or not key):
|
||
messagebox.showerror("API", "Please set Dify Base URL and API Key."); return
|
||
targets = self._collect_targets()
|
||
if not targets:
|
||
messagebox.showerror("Targets", "Please choose at least one target language."); return
|
||
in_path = Path(self.in_path_var.get().strip())
|
||
if not in_path.exists():
|
||
messagebox.showerror("Input", "Input path does not exist."); return
|
||
out_dir = Path(self.out_dir_var.get().strip() or DEFAULT_OUTPUT_DIR)
|
||
out_dir.mkdir(parents=True, exist_ok=True)
|
||
if self.cache is None:
|
||
self.cache = TranslationCache(out_dir / "translation_cache.db")
|
||
include_headers = bool(self.include_headers_var.get())
|
||
recurse = bool(self.recurse_var.get())
|
||
src_sel = self.src_lang_var.get().strip()
|
||
src_lang = None if src_sel.lower() == "auto" else src_sel
|
||
self._log(f"Targets (order): {', '.join(targets)}")
|
||
self._log(f"Input: {in_path}")
|
||
self._log(f"Output: {out_dir}")
|
||
self._log(f"Include header/footer shapes via COM: {include_headers and _WIN32COM_AVAILABLE}")
|
||
def work():
|
||
try:
|
||
process_path(in_path, out_dir, targets, src_lang, base, key, self.cache,
|
||
recurse=recurse, include_headers_shapes_via_com=include_headers,
|
||
backend=backend, ollama_model=self.ollama_model_var.get(), log=self._log)
|
||
except Exception as e:
|
||
self._log(f"[Worker error] {e}")
|
||
finally:
|
||
self._log("Task finished.")
|
||
if self.worker_thread and self.worker_thread.is_alive():
|
||
messagebox.showinfo("Running", "Task is already running."); return
|
||
self.worker_thread = threading.Thread(target=work, daemon=True)
|
||
self.worker_thread.start()
|
||
|
||
def _on_start(self):
|
||
self.txt_log.insert(tk.END, "== Start ==\n")
|
||
self._start_worker(resume=False)
|
||
|
||
def _on_resume(self):
|
||
self.txt_log.insert(tk.END, "== Resume ==\n")
|
||
self._start_worker(resume=True)
|
||
|
||
def _on_stop(self):
|
||
self._log("Stop requested (new files won't start).")
|
||
|
||
def _clear_log(self):
|
||
self.txt_log.delete("1.0", tk.END)
|
||
|
||
def on_close(self):
|
||
try:
|
||
if self.cache: self.cache.close()
|
||
except Exception: pass
|
||
self.destroy()
|
||
|
||
# ---------- Main ----------
|
||
def main():
|
||
app = TranslatorGUI()
|
||
app.protocol("WM_DELETE_WINDOW", app.on_close)
|
||
app.mainloop()
|
||
|
||
if __name__ == "__main__":
|
||
if len(sys.argv) == 1:
|
||
main()
|
||
else:
|
||
load_api_config_from_file()
|
||
if len(sys.argv) < 4:
|
||
print("用法: python document_translator_gui.py <檔案或資料夾> <輸出資料夾> <目標語言以逗號分隔> [--headers]")
|
||
sys.exit(1)
|
||
inp = Path(sys.argv[1]); outd = Path(sys.argv[2]); tgts = [t.strip() for t in sys.argv[3].split(",")]
|
||
include_headers = ("--headers" in sys.argv)
|
||
outd.mkdir(parents=True, exist_ok=True)
|
||
cache = TranslationCache(outd / "translation_cache.db")
|
||
try:
|
||
process_path(inp, outd, tgts, src_lang=None, base_url=DIFY_API_BASE_URL.strip().rstrip("/"),
|
||
api_key=DIFY_API_KEY.strip(), cache=cache, recurse=True,
|
||
include_headers_shapes_via_com=include_headers, log=lambda s: print(s))
|
||
finally:
|
||
cache.close()
|