#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Document Translator - Robust GUI (Dify) - Dedup + soft-skip + only-supplement insertion - Full paragraph discovery (tables/SDT/lists/nested) with textbox exclusion - TextBox deep parse with safe filtering (skip our inserted translations) - Orderable target languages (GUI) - Word COM only for header/footer shapes (optional) """ import os import sys import re import time import threading import queue import sqlite3 from pathlib import Path from typing import List, Tuple, Optional, Dict from docx.table import _Cell import requests # ---------- Optional deps ---------- try: import pythoncom import win32com.client as win32 from win32com.client import constants as c _WIN32COM_AVAILABLE = (sys.platform == "win32") except Exception: _WIN32COM_AVAILABLE = False try: import blingfire _HAS_BLINGFIRE = True except Exception: _HAS_BLINGFIRE = False try: import pysbd _HAS_PYSBD = True except Exception: _HAS_PYSBD = False # ---------- Office libs ---------- import docx from docx.text.paragraph import Paragraph from docx.table import Table from docx.shared import Pt from docx.oxml import OxmlElement from docx.oxml.ns import qn import pptx from pptx.util import Pt as PPTPt import openpyxl from openpyxl.styles import Alignment from openpyxl.comments import Comment from PyPDF2 import PdfReader # ---------- App constants ---------- APP_TITLE = "Document Translator (Robust, Dify)" DEFAULT_OUTPUT_DIR = "translated_files" SUPPORTED = {".docx", ".doc", ".pptx", ".xlsx", ".xls", ".pdf"} # API config is read from api.txt DIFY_API_BASE_URL = "" DIFY_API_KEY = "" # ---------- Tunables ---------- API_CONNECT_TIMEOUT_S = 10 API_READ_TIMEOUT_S = 60 API_ATTEMPTS = 3 API_BACKOFF_BASE = 1.6 SENTENCE_MODE = True INSERT_FONT_SIZE_PT = 10 EXCEL_FORMULA_MODE = "skip" # "skip" | "comment" MAX_SHAPE_CHARS = 1200 # ---------- Load API config ---------- def load_api_config_from_file(): global DIFY_API_BASE_URL, DIFY_API_KEY try: with open("api.txt", "r", encoding="utf-8") as f: for line in f: if line.startswith("base_url:"): DIFY_API_BASE_URL = line.split(":", 1)[1].strip() elif line.startswith("api:"): DIFY_API_KEY = line.split(":", 1)[1].strip() except FileNotFoundError: pass # ---------- Cache ---------- class TranslationCache: def __init__(self, db_path: Path): self.conn = sqlite3.connect(str(db_path), check_same_thread=False) self.lock = threading.Lock() with self.lock: cur = self.conn.cursor() cur.execute(""" CREATE TABLE IF NOT EXISTS translations( src TEXT NOT NULL, tgt TEXT NOT NULL, text TEXT NOT NULL, result TEXT NOT NULL, PRIMARY KEY (src, tgt, text) ) """) self.conn.commit() def get(self, src: str, tgt: str, text: str) -> Optional[str]: with self.lock: cur = self.conn.cursor() cur.execute("SELECT result FROM translations WHERE src=? AND tgt=? AND text=?", (src, tgt, text)) r = cur.fetchone() return r[0] if r else None def put(self, src: str, tgt: str, text: str, result: str): with self.lock: cur = self.conn.cursor() cur.execute("INSERT OR REPLACE INTO translations (src, tgt, text, result) VALUES (?, ?, ?, ?)", (src, tgt, text, result)) self.conn.commit() def close(self): with self.lock: try: self.conn.close() except Exception: pass # ---------- Text utils ---------- def _normalize_text(s: str) -> str: return re.sub(r"\s+", " ", (s or "").strip()).lower() def should_translate(text, source_lang: str) -> bool: """ Translation decision: - If source_lang starts with 'en' (English): translate any non-empty text (letters/digits/etc.). - If source_lang starts with 'auto' or is empty: translate any non-empty alnum-containing text. - Else (non-English): translate unless the text is ALL English letters OR ALL digits. """ if not text: return False if not str(text).strip(): return False s = (source_lang or "").strip().lower() filtered = "".join(ch for ch in str(text) if str(ch).isalnum()) if not filtered: return False if s.startswith("en"): return True if s.startswith("auto") or s == "": return True ASCII = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" if all((c in ASCII) for c in filtered): return False if filtered.isdigit(): return False return True def _has_cjk(s: str) -> bool: return any('\u4e00' <= ch <= '\u9fff' for ch in s or "") def _split_sentences(line: str, lang_hint: Optional[str]) -> List[str]: line = line or "" if not line.strip(): return [] if _HAS_BLINGFIRE: try: s = blingfire.text_to_sentences(line) arr = [t.strip() for t in s.split("\n") if t.strip()] if arr: return arr except Exception: pass if _HAS_PYSBD: try: seg = pysbd.Segmenter(language="en", clean=False) arr = [t.strip() for t in seg.segment(line) if t.strip()] if arr: return arr except Exception: pass # fallback: simple punctuation heuristic out, buf = [], "" for ch in line: buf += ch if ch in "。!?" or ch in ".!?": out.append(buf.strip()); buf = "" if buf.strip(): out.append(buf.strip()) return out # ---------- API ---------- class ApiError(Exception): pass class DifyClient: def __init__(self, base_url: str, api_key: str, log=lambda s: None): self.base_url = base_url.rstrip("/") self.api_key = api_key.strip() self.log = log self._resolved_path = None def _headers(self): return {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"} def _try_post(self, path: str, payload: dict) -> requests.Response: url = f"{self.base_url}{path}" return requests.post(url, headers=self._headers(), json=payload, timeout=(API_CONNECT_TIMEOUT_S, API_READ_TIMEOUT_S)) def _detect_endpoint(self) -> str: base_has_v1 = self.base_url.rstrip("/").endswith("/v1") candidates = ["/chat-messages"] if base_has_v1 else ["/v1/chat-messages", "/chat-messages"] payload = {"inputs": {}, "query": "ping", "user": "health-check", "response_mode": "blocking"} for path in candidates: try: r = self._try_post(path, payload) if r.status_code in (200, 401, 403): self._resolved_path = path self.log(f"[API Detect] use {path} (HTTP {r.status_code})") return path if r.status_code in (404, 405): self.log(f"[API Detect] {path} not usable (HTTP {r.status_code}), trying next...") continue self.log(f"[API Detect] {path} unexpected HTTP {r.status_code}: {r.text[:180]}") except requests.exceptions.RequestException as e: self.log(f"[API Detect] {path} request error: {e}") self._resolved_path = "/v1/chat-messages" self.log("[API Detect] fallback to /v1/chat-messages") return self._resolved_path def health_check(self) -> Tuple[bool, str]: path = self._detect_endpoint() payload = {"inputs": {}, "query": "健康檢查 health check", "user": "health-check", "response_mode": "blocking"} try: r = self._try_post(path, payload) if r.status_code == 200: try: data = r.json() ans = data.get("answer", "") return True, f"OK via {path}; answer len={len(ans)}" except Exception as e: return False, f"Health JSON parse error via {path}: {e}" else: return False, f"HTTP {r.status_code} via {path}: {r.text[:180]}" except requests.exceptions.RequestException as e: return False, f"Request error via {path}: {e}" def translate_once(self, text: str, tgt: str, src_lang: Optional[str]) -> Tuple[bool, str]: if self._resolved_path is None: self._detect_endpoint() prompt = self._build_prompt(text, tgt, src_lang) payload = {"inputs": {}, "query": prompt, "user": "doc-translator-user", "response_mode": "blocking"} last = None for attempt in range(1, API_ATTEMPTS+1): try: r = self._try_post(self._resolved_path, payload) if r.status_code == 200: data = r.json() ans = data.get("answer") if isinstance(ans, str): return True, ans last = f"Invalid JSON: {data}" else: last = f"HTTP {r.status_code}: {r.text[:240]}" except requests.exceptions.RequestException as e: last = str(e) time.sleep(API_BACKOFF_BASE * attempt) return False, str(last) @staticmethod def _build_prompt(text: str, target_language: str, source_language: Optional[str]) -> str: sl = source_language if (source_language and source_language.lower() not in ("auto","auto-detect","auto detect")) else "Auto" return ( f"Task: Translate ONLY into {target_language} from {sl}.\n" f"Rules:\n" f"1) Output translation text ONLY (no source text, no notes, no questions, no language-detection remarks).\n" f"2) Preserve original line breaks.\n" f"3) Do NOT wrap in quotes or code blocks.\n\n" f"{text}" ) class OllamaClient: def __init__(self, base_url: str = "http://localhost:11434", model: str = "gpt-oss:latest", log=lambda s: None): self.base_url = base_url.rstrip("/") self.model = model self.log = log def _gen_url(self, path: str) -> str: return f"{self.base_url}{path}" def health_check(self) -> Tuple[bool, str]: try: r = requests.get(self._gen_url("/api/tags"), timeout=(API_CONNECT_TIMEOUT_S, API_READ_TIMEOUT_S)) if r.status_code == 200: names = [m.get("name","") for m in (r.json().get("models") or []) if isinstance(m, dict)] return True, f"OK; models={', '.join(names[:6]) + ('...' if len(names)>6 else '')}" else: return False, f"HTTP {r.status_code}: {r.text[:180]}" except requests.exceptions.RequestException as e: return False, f"Request error: {e}" def translate_once(self, text: str, tgt: str, src_lang: Optional[str]) -> Tuple[bool, str]: prompt = DifyClient._build_prompt(text, tgt, src_lang) payload = {"model": self.model, "prompt": prompt, "stream": False} last = None for attempt in range(1, API_ATTEMPTS+1): try: r = requests.post(self._gen_url("/api/generate"), json=payload, timeout=(API_CONNECT_TIMEOUT_S, API_READ_TIMEOUT_S)) if r.status_code == 200: data = r.json() ans = data.get("response", "") return True, ans.strip() last = f"HTTP {r.status_code}: {r.text[:180]}" except requests.exceptions.RequestException as e: last = f"Request error: {e}" return False, str(last) def list_ollama_models(base_url: str = "http://localhost:11434") -> list: try: r = requests.get(base_url.rstrip("/") + "/api/tags", timeout=(API_CONNECT_TIMEOUT_S, API_READ_TIMEOUT_S)) if r.status_code == 200: return [m.get("name","") for m in (r.json().get("models") or []) if isinstance(m, dict)] except Exception: pass return ["gpt-oss:latest"] # ---------- High-level translate helpers ---------- def translate_block_sentencewise(text: str, tgt: str, src_lang: Optional[str], cache: TranslationCache, client: DifyClient) -> Tuple[bool, str]: """ Translate a multi-line block line-by-line, sentence-wise; cache per sentence. Returns (all_ok, joined_result). """ if not text or not text.strip(): return True, "" src_key = (src_lang or "auto").lower() # Whole-block cache first cached_whole = cache.get(src_key, tgt, text) if cached_whole is not None: return True, cached_whole out_lines: List[str] = [] all_ok = True for raw_line in text.split("\n"): if not raw_line.strip(): out_lines.append("") continue sentences = _split_sentences(raw_line, src_lang) or [raw_line] parts = [] for s in sentences: c = cache.get(src_key, tgt, s) if c is not None: parts.append(c) continue ok, ans = client.translate_once(s, tgt, src_lang) if not ok: all_ok = False ans = f"【翻譯失敗|{tgt}】{s}" else: cache.put(src_key, tgt, s, ans) parts.append(ans) out_lines.append(" ".join(parts)) final = "\n".join(out_lines) if all_ok: cache.put(src_key, tgt, text, final) return all_ok, final # ---------- DOCX primitives ---------- def _p_text_with_breaks(p: Paragraph) -> str: """Read paragraph including soft line breaks and tabs.""" parts = [] for node in p._p.xpath(".//*[local-name()='t' or local-name()='br' or local-name()='tab']"): tag = node.tag.split("}", 1)[-1] if tag == "t": parts.append(node.text or "") elif tag == "br": parts.append("\n") else: # tab parts.append(" ") return "".join(parts).strip() def _append_after(p: Paragraph, text_block: str, italic: bool=True, font_size_pt: int=INSERT_FONT_SIZE_PT) -> Paragraph: """Insert a new paragraph after p, return the new paragraph (for chain insert).""" new_p = OxmlElement("w:p") p._p.addnext(new_p) np = Paragraph(new_p, p._parent) lines = text_block.split("\n") for i, line in enumerate(lines): run = np.add_run(line) if italic: run.italic = True if font_size_pt: run.font.size = Pt(font_size_pt) if i < len(lines) - 1: run.add_break() tag = np.add_run("\u200b") if italic: tag.italic = True if font_size_pt: tag.font.size = Pt(font_size_pt) return np def _is_our_insert_block(p: Paragraph) -> bool: """Return True iff paragraph contains our zero-width marker.""" return any("\u200b" in (r.text or "") for r in p.runs) def _find_last_inserted_after(p: Paragraph, limit: int = 8) -> Optional[Paragraph]: """Return the last inserted paragraph after p (our style), else None.""" ptr = p._p.getnext() last = None steps = 0 while ptr is not None and steps < limit: if ptr.tag.endswith("}p"): q = Paragraph(ptr, p._parent) if _is_our_insert_block(q): last = q steps += 1 ptr = ptr.getnext() continue break return last def _scan_our_tail_texts(p: Paragraph, limit: int = 8) -> List[str]: """Return texts of our inserted paragraphs right after p (up to limit).""" ptr = p._p.getnext() out = [] steps = 0 while ptr is not None and steps < limit: if ptr.tag.endswith("}p"): q = Paragraph(ptr, p._parent) if _is_our_insert_block(q): out.append(_p_text_with_breaks(q)) steps += 1 ptr = ptr.getnext() continue break return out # ---------- TextBox helpers ---------- def _txbx_iter_texts(doc: docx.Document): """ Yield (txbxContent_element, joined_source_text) - Deeply collect all descendant under txbxContent - Skip our inserted translations: contains zero-width or (all italic and no CJK) - Keep only lines that still have CJK """ def _p_text_flags(p_el): parts=[] for node in p_el.xpath(".//*[local-name()='t' or local-name()='br' or local-name()='tab']"): tag=node.tag.split('}',1)[-1] if tag=="t": parts.append(node.text or "") elif tag=="br": parts.append("\n") else: parts.append(" ") text="".join(parts) has_zero = ("\u200b" in text) runs = p_el.xpath(".//*[local-name()='r']") vis, ital = [], [] for r in runs: rt = "".join([(t.text or "") for t in r.xpath(".//*[local-name()='t']")]) if (rt or "").strip(): vis.append(rt); ital.append(bool(r.xpath(".//*[local-name()='i']"))) all_italic = (len(vis)>0 and all(ital)) return text, has_zero, all_italic for tx in doc._element.xpath(".//*[local-name()='txbxContent']"): kept=[] for p in tx.xpath(".//*[local-name()='p']"): # all descendant paragraphs text, has_zero, all_italic = _p_text_flags(p) if not (text or "").strip(): continue if has_zero: continue # our inserted for line in text.split("\n"): if line.strip(): kept.append(line.strip()) if kept: joined = "\n".join(kept) yield tx, joined def _txbx_append_paragraph(tx, text_block: str, italic: bool=True, font_size_pt: int=INSERT_FONT_SIZE_PT): p = OxmlElement("w:p") r = OxmlElement("w:r") rPr = OxmlElement("w:rPr") if italic: rPr.append(OxmlElement("w:i")) if font_size_pt: sz = OxmlElement("w:sz"); sz.set(qn("w:val"), str(int(font_size_pt*2))); rPr.append(sz) r.append(rPr) lines = text_block.split("\n") for i, line in enumerate(lines): if i>0: r.append(OxmlElement("w:br")) t = OxmlElement("w:t"); t.set(qn("xml:space"), "preserve"); t.text = line; r.append(t) tag = OxmlElement("w:t"); tag.set(qn("xml:space"), "preserve"); tag.text="\u200b"; r.append(tag) p.append(r); tx.append(p) def _txbx_tail_equals(tx, translations: List[str]) -> bool: paras = tx.xpath("./*[local-name()='p']") if len(paras) < len(translations): return False tail = paras[-len(translations):] for q, expect in zip(tail, translations): parts = [] for node in q.xpath(".//*[local-name()='t' or local-name()='br']"): tag = node.tag.split("}", 1)[-1] parts.append("\n" if tag=="br" else (node.text or "")) if _normalize_text("".join(parts).strip()) != _normalize_text(expect): return False return True # ---------- Two-phase model for DOCX ---------- class Segment: def __init__(self, kind: str, ref, ctx: str, text: str): self.kind = kind # 'para' | 'txbx' self.ref = ref self.ctx = ctx self.text = text def _get_paragraph_key(p: Paragraph) -> str: """Generate a stable unique key for paragraph deduplication.""" try: # Use XML content hash + text content for stable deduplication xml_content = p._p.xml if hasattr(p._p, 'xml') else str(p._p) text_content = _p_text_with_breaks(p) combined = f"{hash(xml_content)}_{len(text_content)}_{text_content[:50]}" return combined except Exception: # Fallback to simple text-based key text_content = _p_text_with_breaks(p) return f"fallback_{hash(text_content)}_{len(text_content)}" def _collect_docx_segments(doc: docx.Document) -> List[Segment]: """ Enhanced segment collector with improved stability. Handles paragraphs, tables, textboxes, and SDT Content Controls. """ segs: List[Segment] = [] seen_par_keys = set() # 我們需要從 docx.oxml.ns 導入命名空間前綴,以便 XPath 查詢 from docx.oxml.ns import nsdecls, qn def _add_paragraph(p: Paragraph, ctx: str): try: p_key = _get_paragraph_key(p) if p_key in seen_par_keys: return txt = _p_text_with_breaks(p) if txt.strip() and not _is_our_insert_block(p): segs.append(Segment("para", p, ctx, txt)) seen_par_keys.add(p_key) except Exception as e: # Log error but continue processing print(f"[WARNING] 段落處理錯誤: {e}, 跳過此段落") def _process_container_content(container, ctx: str): """ Recursively processes content within a container (body, cell, or SDT content). Identifies and handles paragraphs, tables, and SDT elements. """ if container._element is None: return for child_element in container._element: qname = child_element.tag if qname.endswith('}p'): # Paragraph p = Paragraph(child_element, container) _add_paragraph(p, ctx) elif qname.endswith('}tbl'): # Table table = Table(child_element, container) for r_idx, row in enumerate(table.rows, 1): for c_idx, cell in enumerate(row.cells, 1): cell_ctx = f"{ctx} > Tbl(r{r_idx},c{c_idx})" _process_container_content(cell, cell_ctx) elif qname.endswith('}sdt'): # <<<< NEW: Structured Document Tag (SDT) sdt_ctx = f"{ctx} > SDT" # 1. 提取 SDT 的元數據文本 (Placeholder, Dropdown items) # 命名空間 'w' 是必須的 ns = {'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'} # 提取 Placeholder text placeholder_texts = [] for t in child_element.xpath('.//w:placeholder//w:t', namespaces=ns): if t.text: placeholder_texts.append(t.text) if placeholder_texts: full_placeholder = "".join(placeholder_texts).strip() if full_placeholder: # 將 placeholder 視為一個特殊的段落來處理 # 注意:我們無法直接將翻譯寫回 placeholder,所以這裡我們創建一個假的 Paragraph ref # 翻譯結果將會被插入到 SDT 內容的末尾 segs.append(Segment("para", child_element, f"{sdt_ctx}-Placeholder", full_placeholder)) # 提取 Dropdown list items list_items = [] for item in child_element.xpath('.//w:dropDownList/w:listItem', namespaces=ns): display_text = item.get(qn('w:displayText')) if display_text: list_items.append(display_text) if list_items: # 將所有選項合併為一個文本塊進行翻譯 items_as_text = "\n".join(list_items) segs.append(Segment("para", child_element, f"{sdt_ctx}-Dropdown", items_as_text)) # 2. 遞迴處理 SDT 的實際內容 (sdtContent) sdt_content_element = child_element.find(qn('w:sdtContent')) if sdt_content_element is not None: # python-docx 沒有 SdtContent 的高階物件,但我們可以將 XML 元素和父級傳給遞迴函式 # 這裡我們模擬一個 container 物件,它只需要 ._element 和 ._parent 屬性 class SdtContentWrapper: def __init__(self, element, parent): self._element = element self._parent = parent sdt_content_wrapper = SdtContentWrapper(sdt_content_element, container) _process_container_content(sdt_content_wrapper, sdt_ctx) # --- Main execution starts here --- # 1. Process the main document body _process_container_content(doc._body, "Body") # 2. Process textboxes for tx, s in _txbx_iter_texts(doc): if s.strip() and (_has_cjk(s) or should_translate(s, 'auto')): segs.append(Segment("txbx", tx, "TextBox", s)) return segs def _insert_docx_translations(doc: docx.Document, segs: List[Segment], tmap: Dict[Tuple[str, str], str], targets: List[str], log=lambda s: None) -> Tuple[int, int]: """ Insert translations into DOCX document segments. CRITICAL: This function contains the fix for the major translation insertion bug. The key fix is in the segment filtering logic - we now correctly check if any target language has translation available using the proper key format (target_lang, text). Args: doc: The DOCX document object segs: List of segments to translate tmap: Translation map with keys as (target_language, source_text) targets: List of target languages in order log: Logging function Returns: Tuple of (successful_insertions, skipped_insertions) Key Bug Fix: OLD (INCORRECT): if (seg.kind, seg.text) not in tmap and (targets[0], seg.text) not in tmap NEW (CORRECT): has_any_translation = any((tgt, seg.text) in tmap for tgt in targets) """ ok_cnt = skip_cnt = 0 # Helper function to add a formatted run to a paragraph def _add_formatted_run(p: Paragraph, text: str, italic: bool, font_size_pt: int): lines = text.split("\n") for i, line in enumerate(lines): run = p.add_run(line) if italic: run.italic = True if font_size_pt: run.font.size = Pt(font_size_pt) if i < len(lines) - 1: run.add_break() # Add our zero-width space marker tag_run = p.add_run("\u200b") if italic: tag_run.italic = True if font_size_pt: tag_run.font.size = Pt(font_size_pt) for seg in segs: # Check if any target language has translation for this segment has_any_translation = any((tgt, seg.text) in tmap for tgt in targets) if not has_any_translation: log(f"[SKIP] 無翻譯結果: {seg.ctx} | {seg.text[:50]}...") continue # Get translations for all targets, with fallback for missing ones translations = [] for tgt in targets: if (tgt, seg.text) in tmap: translations.append(tmap[(tgt, seg.text)]) else: log(f"[WARNING] 缺少 {tgt} 翻譯: {seg.text[:30]}...") translations.append(f"【翻譯查詢失敗|{tgt}】{seg.text[:50]}...") log(f"[INSERT] 準備插入 {len(translations)} 個翻譯到 {seg.ctx}: {seg.text[:30]}...") if seg.kind == "para": # Check if this is an SDT segment (ref is an XML element, not a Paragraph) if hasattr(seg.ref, 'tag') and seg.ref.tag.endswith('}sdt'): # Handle SDT segments - insert translation into sdtContent sdt_element = seg.ref ns = {'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'} sdt_content = sdt_element.find(qn('w:sdtContent')) if sdt_content is not None: # Check if translations already exist existing_paras = sdt_content.xpath('.//w:p', namespaces=ns) existing_texts = [] for ep in existing_paras: p_obj = Paragraph(ep, None) if _is_our_insert_block(p_obj): existing_texts.append(_p_text_with_breaks(p_obj)) # Check if all translations already exist if len(existing_texts) >= len(translations): if all(_normalize_text(e) == _normalize_text(t) for e, t in zip(existing_texts[:len(translations)], translations)): skip_cnt += 1 log(f"[SKIP] SDT 已存在翻譯: {seg.text[:30]}...") continue # Add translations to SDT content for t in translations: if not any(_normalize_text(t) == _normalize_text(e) for e in existing_texts): # Create new paragraph in SDT content new_p_element = OxmlElement("w:p") sdt_content.append(new_p_element) new_p = Paragraph(new_p_element, None) _add_formatted_run(new_p, t, italic=True, font_size_pt=INSERT_FONT_SIZE_PT) ok_cnt += 1 log(f"[SUCCESS] SDT 插入 {len(translations)} 個翻譯") continue p: Paragraph = seg.ref # --- CONTEXT-AWARE INSERTION LOGIC --- # Check if the paragraph's parent is a table cell if isinstance(p._parent, _Cell): cell = p._parent try: # Find the current paragraph's position in the cell cell_paragraphs = list(cell.paragraphs) p_index = -1 for idx, cell_p in enumerate(cell_paragraphs): if cell_p._element == p._element: p_index = idx break if p_index == -1: log(f"[WARNING] 無法找到段落在單元格中的位置,使用原始方法") # Fallback to original method for block in translations: new_p = cell.add_paragraph() _add_formatted_run(new_p, block, italic=True, font_size_pt=INSERT_FONT_SIZE_PT) ok_cnt += 1 continue # Check if translations already exist right after this paragraph existing_texts = [] check_limit = min(p_index + 1 + len(translations), len(cell_paragraphs)) for idx in range(p_index + 1, check_limit): if _is_our_insert_block(cell_paragraphs[idx]): existing_texts.append(_p_text_with_breaks(cell_paragraphs[idx])) # Check if all translations already exist in order if len(existing_texts) >= len(translations): if all(_normalize_text(e) == _normalize_text(t) for e, t in zip(existing_texts[:len(translations)], translations)): skip_cnt += 1 log(f"[SKIP] 表格單元格已存在翻譯: {seg.text[:30]}...") continue # Determine which translations need to be added to_add = [] for t in translations: if not any(_normalize_text(t) == _normalize_text(e) for e in existing_texts): to_add.append(t) if not to_add: skip_cnt += 1 log(f"[SKIP] 表格單元格所有翻譯已存在: {seg.text[:30]}...") continue # Insert new paragraphs right after the current paragraph insert_after = p for block in to_add: try: # Create new paragraph and insert it after the current position new_p_element = OxmlElement("w:p") insert_after._element.addnext(new_p_element) new_p = Paragraph(new_p_element, cell) _add_formatted_run(new_p, block, italic=True, font_size_pt=INSERT_FONT_SIZE_PT) insert_after = new_p # Update position for next insertion except Exception as e: log(f"[ERROR] 表格插入失敗: {e}, 嘗試fallback方法") # Fallback: add at the end of cell try: new_p = cell.add_paragraph() _add_formatted_run(new_p, block, italic=True, font_size_pt=INSERT_FONT_SIZE_PT) log(f"[SUCCESS] Fallback插入成功") except Exception as e2: log(f"[FATAL] Fallback也失敗: {e2}") continue ok_cnt += 1 log(f"[SUCCESS] 表格單元格插入 {len(to_add)} 個翻譯(緊接原文後)") except Exception as e: log(f"[ERROR] 表格處理全面失敗: {e}, 跳過此段落") continue else: # The original logic for top-level paragraphs try: existing_texts = _scan_our_tail_texts(p, limit=max(len(translations), 4)) if existing_texts and len(existing_texts) >= len(translations): if all(_normalize_text(e) == _normalize_text(t) for e, t in zip(existing_texts[:len(translations)], translations)): skip_cnt += 1 log(f"[SKIP] 段落已存在翻譯: {seg.text[:30]}...") continue to_add = [] for t in translations: if not any(_normalize_text(t) == _normalize_text(e) for e in existing_texts): to_add.append(t) if not to_add: skip_cnt += 1 log(f"[SKIP] 段落所有翻譯已存在: {seg.text[:30]}...") continue last = _find_last_inserted_after(p, limit=max(len(translations), 4)) anchor = last if last else p for block in to_add: try: anchor = _append_after(anchor, block, italic=True, font_size_pt=INSERT_FONT_SIZE_PT) except Exception as e: log(f"[ERROR] 段落插入失敗: {e}, 嘗試簡化插入") try: # Fallback: simple append new_p = p._parent.add_paragraph(block) new_p.runs[0].italic = True if new_p.runs else None log(f"[SUCCESS] 簡化插入成功") except Exception as e2: log(f"[FATAL] 簡化插入也失敗: {e2}") continue ok_cnt += 1 log(f"[SUCCESS] 段落插入 {len(to_add)} 個翻譯") except Exception as e: log(f"[ERROR] 段落處理全面失敗: {e}, 跳過此段落") continue elif seg.kind == "txbx": tx = seg.ref if _txbx_tail_equals(tx, translations): skip_cnt += 1; continue paras = tx.xpath("./*[local-name()='p']") tail_texts = [] scan = paras[-max(len(translations), 4):] if len(paras) else [] for q in scan: has_zero = any(((t.text or "").find("\u200b") >= 0) for t in q.xpath(".//*[local-name()='t']")) if has_zero: qtxt = "".join([(node.text or "") for node in q.xpath(".//*[local-name()='t' or local-name()='br']")]).strip() tail_texts.append(qtxt) to_add = [] for t in translations: if not any(_normalize_text(t) == _normalize_text(e) for e in tail_texts): to_add.append(t) if not to_add: skip_cnt += 1; continue for block in to_add: _txbx_append_paragraph(tx, block, italic=True, font_size_pt=INSERT_FONT_SIZE_PT) ok_cnt += 1 log(f"[DOCX] 插入完成:成功 {ok_cnt} 段、略過 {skip_cnt} 段(已存在/只補缺)") return ok_cnt, skip_cnt def translate_docx(in_path: str, out_path: str, targets: List[str], src_lang: Optional[str], cache: TranslationCache, client: DifyClient, include_headers_shapes_via_com: bool, log=lambda s: None): from shutil import copyfile copyfile(in_path, out_path) doc = docx.Document(out_path) # Health check ok, msg = client.health_check() log(f"[API Health] {msg}") if not ok: raise ApiError("API 無法連線或未授權。請檢查 base_url / api。") # Phase 1: collect segs = _collect_docx_segments(doc) log(f"[DOCX] 待翻譯段/方塊總數:{len(segs)}") # Phase 2: translate unique uniq_texts = [t for t in sorted(set(s.text for s in segs)) if should_translate(t, (src_lang or 'auto'))] tmap: Dict[Tuple[str, str], str] = {} total = len(uniq_texts) * len(targets) done = 0; fail_cnt = 0 for txt in uniq_texts: for tgt in targets: done += 1 preview = (txt.replace("\n", " ")[:40] + "..." if len(txt) > 40 else txt) log(f"[TR] {done}/{total} {tgt} len={len(txt)} 「{preview}」") if SENTENCE_MODE: ok1, res = translate_block_sentencewise(txt, tgt, src_lang, cache, client) else: ok1, res = client.translate_once(txt, tgt, src_lang) if not ok1: res = f"【翻譯失敗|{tgt}】{txt}" if not ok1: fail_cnt += 1 src_key = (src_lang or "auto").lower() if SENTENCE_MODE and ok1: cache.put(src_key, tgt, txt, res) tmap[(tgt, txt)] = res if fail_cnt: log(f"[DOCX] 翻譯失敗 {fail_cnt} 筆(以占位文寫回)") # Phase 3: insert _insert_docx_translations(doc, segs, tmap, targets, log=log) # Save docx doc.save(out_path) log(f"[DOCX] 輸出:{os.path.basename(out_path)}") # Only header/footer shapes via COM if requested if include_headers_shapes_via_com and _WIN32COM_AVAILABLE: postprocess_docx_shapes_with_word(out_path, targets, src_lang, cache, client, include_headers=True, log=log) # ---------- Windows COM helpers (optional, headers/footers only) ---------- def _com_iter(coll): try: count = coll.Count except Exception: return for i in range(1, count+1): yield coll.Item(i) def _word_convert(input_path: str, output_path: str, target_format: int): if not _WIN32COM_AVAILABLE: raise RuntimeError("Word COM not available") pythoncom.CoInitialize() try: word = win32.Dispatch("Word.Application"); word.Visible = False doc = word.Documents.Open(os.path.abspath(input_path)) doc.SaveAs2(os.path.abspath(output_path), FileFormat=target_format) doc.Close(False) finally: word.Quit(); pythoncom.CoUninitialize() def _excel_convert(input_path: str, output_path: str): if not _WIN32COM_AVAILABLE: raise RuntimeError("Excel COM not available") pythoncom.CoInitialize() try: excel = win32.Dispatch("Excel.Application"); excel.Visible = False try: excel.DisplayAlerts = False except Exception: pass wb = excel.Workbooks.Open(os.path.abspath(input_path)) wb.SaveAs(os.path.abspath(output_path), FileFormat=51) wb.Close(SaveChanges=False) finally: excel.Quit(); pythoncom.CoUninitialize() def postprocess_docx_shapes_with_word(docx_path: str, targets: List[str], src_lang: Optional[str], cache: TranslationCache, client: DifyClient, include_headers: bool=False, log=lambda s: None): # Only when explicitly requested, and headers/footers only if not _WIN32COM_AVAILABLE or not include_headers: return pythoncom.CoInitialize() try: word = win32.Dispatch("Word.Application"); word.Visible = False try: word.ScreenUpdating = False except Exception: pass try: word.DisplayAlerts = 0 except Exception: pass doc = word.Documents.Open(os.path.abspath(docx_path)) def _proc_shapes(shapes): for shp in _com_iter(shapes): try: tf = getattr(shp, "TextFrame", None) if tf and getattr(tf, "HasText", False): src = tf.TextRange.Text if not src or not src.strip(): continue if len(src) > MAX_SHAPE_CHARS: log(f"[Skip shape] too long ({len(src)} chars)"); continue blocks=[] for tgt in targets: ok, tr = translate_block_sentencewise(src, tgt, src_lang, cache, client) if not ok: tr = f"【翻譯失敗|{tgt}】{src}" blocks.append(tr) suffix = "\r" + "\r".join(blocks) full = tf.TextRange.Text or "" if _normalize_text(full[-len(suffix):]) == _normalize_text(suffix): continue tf.TextRange.InsertAfter(suffix) try: dup = tf.TextRange.Duplicate start = len(full) + 1; end = dup.Characters.Count dup.SetRange(start, end); dup.Font.Italic = True except Exception: pass except Exception as e: log(f"[COM shape error] {e}") # headers/footers only for sec in _com_iter(doc.Sections): try: _proc_shapes(sec.Headers(c.wdHeaderFooterPrimary).Shapes) _proc_shapes(sec.Headers(c.wdHeaderFooterFirstPage).Shapes) _proc_shapes(sec.Headers(c.wdHeaderFooterEvenPages).Shapes) _proc_shapes(sec.Footers(c.wdHeaderFooterPrimary).Shapes) _proc_shapes(sec.Footers(c.wdHeaderFooterFirstPage).Shapes) _proc_shapes(sec.Footers(c.wdHeaderFooterEvenPages).Shapes) except Exception: pass doc.Save(); doc.Close(False) finally: try: word.ScreenUpdating = True except Exception: pass word.Quit(); pythoncom.CoUninitialize() # ---------- PPTX ---------- def _ppt_text_of_tf(tf) -> str: return "\n".join([p.text for p in tf.paragraphs]) def _ppt_tail_equals(tf, translations: List[str]) -> bool: if len(tf.paragraphs) < len(translations): return False tail = tf.paragraphs[-len(translations):] for para, expect in zip(tail, translations): if _normalize_text(para.text) != _normalize_text(expect): return False if any((r.font.italic is not True) and (r.text or "").strip() for r in para.runs): return False return True def _ppt_append(tf, text_block: str): p = tf.add_paragraph() p.text = text_block for r in p.runs: r.font.italic = True r.font.size = PPTPt(12) def translate_pptx(in_path: str, out_path: str, targets: List[str], src_lang: Optional[str], cache: TranslationCache, client: DifyClient, log=lambda s: None): prs = pptx.Presentation(in_path) segs=[] for slide in prs.slides: for sh in slide.shapes: if not getattr(sh, "has_text_frame", False): continue tf = sh.text_frame txt = _ppt_text_of_tf(tf) if txt.strip(): segs.append((tf, txt)) log(f"[PPTX] 待翻譯區塊:{len(segs)}") uniq = [s for s in sorted(set(s for _, s in segs)) if should_translate(s, (src_lang or 'auto'))] tmap: Dict[Tuple[str, str], str] = {} for s in uniq: for tgt in targets: ok, res = translate_block_sentencewise(s, tgt, src_lang, cache, client) if not ok: res = f"【翻譯失敗|{tgt}】{s}" tmap[(tgt, s)] = res ok_cnt=skip_cnt=0 for tf, s in segs: trs = [tmap[(tgt, s)] for tgt in targets] if _ppt_tail_equals(tf, trs): skip_cnt += 1; continue for block in trs: _ppt_append(tf, block) ok_cnt += 1 prs.save(out_path) log(f"[PPTX] 插入完成:成功 {ok_cnt}、略過 {skip_cnt} → {os.path.basename(out_path)}") # ---------- XLSX/XLS ---------- def _get_display_text_for_translation(ws, ws_vals, r: int, c: int) -> Optional[str]: val = ws.cell(row=r, column=c).value if isinstance(val, str) and val.startswith("="): if ws_vals is not None: shown = ws_vals.cell(row=r, column=c).value return shown if isinstance(shown, str) and shown.strip() else None return None if isinstance(val, str) and val.strip(): return val if ws_vals is not None: shown = ws_vals.cell(row=r, column=c).value if isinstance(shown, str) and shown.strip(): return shown return None def translate_xlsx_xls(in_path: str, out_path: str, targets: List[str], src_lang: Optional[str], cache: TranslationCache, client: DifyClient, excel_formula_mode: str = EXCEL_FORMULA_MODE, log=lambda s: None): ext = Path(in_path).suffix.lower() out_xlsx = Path(out_path).with_suffix(".xlsx") if ext == ".xls" and _WIN32COM_AVAILABLE: tmp = str(Path(out_path).with_suffix("")) + "__from_xls.xlsx" try: log("[XLS] 使用 Excel COM 轉檔為 .xlsx …") _excel_convert(in_path, tmp) translate_xlsx_xls(tmp, out_path, targets, src_lang, cache, client, excel_formula_mode=excel_formula_mode, log=log) finally: try: os.remove(tmp) except Exception: pass return if ext not in (".xlsx", ".xls"): raise RuntimeError("Unsupported Excel type") wb = openpyxl.load_workbook(in_path, data_only=False) try: wb_vals = openpyxl.load_workbook(in_path, data_only=True) except Exception: wb_vals = None segs=[] for ws in wb.worksheets: ws_vals = wb_vals[ws.title] if wb_vals and ws.title in wb_vals.sheetnames else None max_row, max_col = ws.max_row, ws.max_column for r in range(1, max_row+1): for c in range(1, max_col+1): src_text = _get_display_text_for_translation(ws, ws_vals, r, c) if not src_text: continue if not should_translate(src_text, (src_lang or 'auto')): continue val = ws.cell(row=r, column=c).value is_formula = isinstance(val, str) and val.startswith("=") segs.append((ws.title, r, c, src_text, is_formula)) log(f"[Excel] 待翻譯儲存格:{len(segs)}") uniq = sorted(set(s[3] for s in segs)) tmap: Dict[Tuple[str, str], str] = {} for text in uniq: for tgt in targets: ok, res = translate_block_sentencewise(text, tgt, src_lang, cache, client) if not ok: res = f"【翻譯失敗|{tgt}】{text}" tmap[(tgt, text)] = res for sheet_name, r, c, src_text, is_formula in segs: ws = wb[sheet_name] trs = [tmap[(tgt, src_text)] for tgt in targets] if is_formula: if excel_formula_mode == "skip": continue elif excel_formula_mode == "comment": txt_comment = "\n".join([f"[{t}] {res}" for t, res in zip(targets, trs)]) cell = ws.cell(row=r, column=c) exist = cell.comment if not exist or _normalize_text(exist.text) != _normalize_text(txt_comment): cell.comment = Comment(txt_comment, "translator") continue else: continue combined = "\n".join([src_text] + trs) cell = ws.cell(row=r, column=c) if isinstance(cell.value, str) and _normalize_text(cell.value) == _normalize_text(combined): continue cell.value = combined try: if cell.alignment: cell.alignment = Alignment(horizontal=cell.alignment.horizontal, vertical=cell.alignment.vertical, wrap_text=True) else: cell.alignment = Alignment(wrap_text=True) except Exception: cell.alignment = Alignment(wrap_text=True) wb.save(out_xlsx) log(f"[Excel] 輸出:{out_xlsx.name}") # ---------- PDF ---------- def translate_pdf(in_path: str, out_path: str, targets: List[str], src_lang: Optional[str], cache: TranslationCache, client: DifyClient, log=lambda s: None): temp_docx = str(Path(out_path).with_suffix("")) + "__from_pdf.docx" if _WIN32COM_AVAILABLE: try: _word_convert(in_path, temp_docx, 16) translate_docx(temp_docx, out_path, targets, src_lang, cache, client, include_headers_shapes_via_com=False, log=log) try: os.remove(temp_docx) except Exception: pass return except Exception as e: log(f"[PDF] Word import failed, fallback to text extract: {e}") doc = docx.Document() try: reader = PdfReader(in_path) for i, page in enumerate(reader.pages, start=1): doc.add_heading(f"— Page {i} —", level=1) text = page.extract_text() or "" if text.strip(): doc.add_paragraph(text) for tgt in targets: ok, tr = translate_block_sentencewise(text, tgt, src_lang, cache, client) if not ok: tr = f"【翻譯失敗|{tgt}】{text}" p = doc.add_paragraph("") lines = tr.split("\n") for j, line in enumerate(lines): r = p.add_run(line); r.italic = True; r.font.size = Pt(INSERT_FONT_SIZE_PT) if j < len(lines)-1: r.add_break() tag = p.add_run("\u200b"); tag.italic = True; tag.font.size = Pt(INSERT_FONT_SIZE_PT) else: doc.add_paragraph("[Empty or image-only page]") except Exception as e: doc.add_paragraph(f"[PDF extract error] {e}") doc.save(out_path) log(f"[PDF] 輸出(docx 報告):{os.path.basename(out_path)}") # ---------- Orchestrator ---------- def process_path(input_path: Path, output_dir: Path, targets: List[str], src_lang: Optional[str], base_url: str, api_key: str, cache: TranslationCache, recurse: bool, include_headers_shapes_via_com: bool, backend: str = 'Ollama', ollama_model: str = 'gpt-oss:latest', log=lambda s: None): if not input_path.exists(): raise FileNotFoundError(f"Input not found: {input_path}") output_dir.mkdir(parents=True, exist_ok=True) client = (DifyClient(base_url, api_key, log=log) if backend.lower()=="dify" else OllamaClient(model=ollama_model, log=log)) files: List[Path] if input_path.is_dir(): candidates = input_path.rglob("*") if recurse else input_path.glob("*") files = [p for p in candidates if p.is_file() and p.suffix.lower() in SUPPORTED] log(f"[Folder] 掃描到 {len(files)} 個支援檔案") else: files = [input_path] if input_path.suffix.lower() in SUPPORTED else [] if not files: log("Selected file type is not supported."); return for src in files: ext = src.suffix.lower() stem = src.stem out_name = f"{stem}_translated{ext if ext in ('.docx','.pptx','.xlsx') else ('.docx' if ext in ('.doc','.pdf') else ext)}" out_path = output_dir / out_name log("="*24); log(f"處理:{src.name}") try: if ext == ".docx": translate_docx(str(src), str(out_path), targets, src_lang, cache, client, include_headers_shapes_via_com=include_headers_shapes_via_com, log=log) elif ext == ".doc": tmp_docx = str(output_dir / f"{stem}__tmp.docx") if _WIN32COM_AVAILABLE: _word_convert(str(src), tmp_docx, 16) translate_docx(tmp_docx, str(out_path), targets, src_lang, cache, client, include_headers_shapes_via_com=include_headers_shapes_via_com, log=log) try: os.remove(tmp_docx) except Exception: pass else: log("[DOC] 無法使用 Word COM,請先轉為 .docx") elif ext == ".pptx": translate_pptx(str(src), str(out_path), targets, src_lang, cache, client, log=log) elif ext in (".xlsx", ".xls"): translate_xlsx_xls(str(src), str(out_path), targets, src_lang, cache, client, log=log) elif ext == ".pdf": translate_pdf(str(src), str(out_path), targets, src_lang, cache, client, log=log) log(f"完成:{src.name} → {out_path.name}") except ApiError as e: log(f"[FATAL] {src.name}: {e}") except Exception as e: log(f"[FATAL] {src.name}: {e}") # ---------- GUI ---------- import tkinter as tk from tkinter import ttk, filedialog, messagebox COMMON_LANGS = [ "English","Vietnamese","Traditional Chinese","Simplified Chinese","Japanese","Korean", "Thai","Indonesian","French","German","Spanish","Portuguese","Italian","Russian","Arabic","Hindi" ] class TranslatorGUI(tk.Tk): def __init__(self): super().__init__() self.title(APP_TITLE); self.geometry("1040x900") self.stop_flag = threading.Event() self.worker_thread: Optional[threading.Thread] = None self.log_queue: "queue.Queue[str]" = queue.Queue() self.cache: Optional[TranslationCache] = None self._build_ui() load_api_config_from_file() if DIFY_API_BASE_URL: self.base_url_var.set(DIFY_API_BASE_URL) if DIFY_API_KEY: self.api_key_var.set(DIFY_API_KEY) try: self._refresh_ollama_models() except Exception: pass self.after(100, self._drain_log_queue) def _build_ui(self): pad = {"padx":8,"pady":4} # Paths frm_path = ttk.LabelFrame(self, text="Paths"); frm_path.pack(fill="x", **pad) self.input_mode_var = tk.StringVar(value="file") self.in_path_var = tk.StringVar() self.out_dir_var = tk.StringVar(value=DEFAULT_OUTPUT_DIR) self.recurse_var = tk.BooleanVar(value=True) ttk.Radiobutton(frm_path, text="Single File", value="file", variable=self.input_mode_var).grid(row=0, column=0, sticky="w") ttk.Radiobutton(frm_path, text="Folder", value="folder", variable=self.input_mode_var).grid(row=0, column=1, sticky="w") ttk.Label(frm_path, text="Input path:").grid(row=1, column=0, sticky="w") ttk.Entry(frm_path, textvariable=self.in_path_var, width=74).grid(row=1, column=1, sticky="we") ttk.Button(frm_path, text="Browse...", command=self._browse_input).grid(row=1, column=2, sticky="e") ttk.Checkbutton(frm_path, text="Recurse subfolders (folder mode)", variable=self.recurse_var).grid(row=2, column=1, sticky="w") ttk.Label(frm_path, text="Output folder:").grid(row=3, column=0, sticky="w") ttk.Entry(frm_path, textvariable=self.out_dir_var, width=74).grid(row=3, column=1, sticky="we") ttk.Button(frm_path, text="Browse...", command=self._browse_output).grid(row=3, column=2, sticky="e") frm_path.columnconfigure(1, weight=1) # API frm_api = ttk.LabelFrame(self, text="Backend & API"); frm_api.pack(fill="x", **pad) self.backend_var = tk.StringVar(value="Ollama") ttk.Label(frm_api, text="Backend:").grid(row=0, column=0, sticky="w") ttk.Combobox(frm_api, textvariable=self.backend_var, values=["Ollama","Dify"], width=18, state="readonly").grid(row=0, column=1, sticky="w") # Dify settings self.base_url_var = tk.StringVar(); self.api_key_var = tk.StringVar() ttk.Label(frm_api, text="Dify Base URL:").grid(row=1, column=0, sticky="w") ttk.Entry(frm_api, textvariable=self.base_url_var, width=60).grid(row=1, column=1, sticky="we") ttk.Label(frm_api, text="Dify API Key:").grid(row=2, column=0, sticky="w") ttk.Entry(frm_api, textvariable=self.api_key_var, width=60, show="•").grid(row=2, column=1, sticky="we") # Ollama model self.ollama_model_var = tk.StringVar(value="gpt-oss:latest") ttk.Label(frm_api, text="Ollama Model:").grid(row=3, column=0, sticky="w") self.cmb_ollama = ttk.Combobox(frm_api, textvariable=self.ollama_model_var, values=[], width=40) self.cmb_ollama.grid(row=3, column=1, sticky="w") ttk.Button(frm_api, text="Refresh Models", command=self._refresh_ollama_models).grid(row=3, column=2, sticky="w") frm_api.columnconfigure(1, weight=1) # Languages & Order frm_lang = ttk.LabelFrame(self, text="Languages & Order"); frm_lang.pack(fill="x", **pad) self.src_lang_var = tk.StringVar(value="Auto") ttk.Label(frm_lang, text="Source:").grid(row=0, column=0, sticky="w") ttk.Combobox(frm_lang, textvariable=self.src_lang_var, values=["Auto"] + COMMON_LANGS, width=24, state="readonly").grid(row=0, column=1, sticky="w") ttk.Label(frm_lang, text="Targets (select & reorder):").grid(row=1, column=0, sticky="nw") self.lst_targets = tk.Listbox(frm_lang, selectmode="extended", height=10, exportselection=False) for lang in COMMON_LANGS: self.lst_targets.insert(tk.END, lang) # 預設英、越 self.lst_targets.selection_set(0) self.lst_targets.selection_set(1) self.lst_targets.grid(row=1, column=1, sticky="we") frm_lang.columnconfigure(1, weight=1) side = ttk.Frame(frm_lang); side.grid(row=1, column=2, sticky="nsw", padx=8) ttk.Button(side, text="▲ Move Up", command=self._move_up).pack(anchor="w", pady=(0,4)) ttk.Button(side, text="▼ Move Down", command=self._move_down).pack(anchor="w", pady=(0,8)) ttk.Label(side, text="Hint: 順序 = 輸出順序").pack(anchor="w") self.sel_summary_var = tk.StringVar(value="Selected: English, Vietnamese") ttk.Label(frm_lang, textvariable=self.sel_summary_var).grid(row=2, column=1, sticky="w", pady=(6,0)) # Options frm_opt = ttk.LabelFrame(self, text="Options"); frm_opt.pack(fill="x", **pad) self.include_headers_var = tk.BooleanVar(value=False) ttk.Checkbutton(frm_opt, text="Include headers/footers Shapes via Word COM (Windows only)", variable=self.include_headers_var).grid(row=0, column=0, sticky="w") # Controls frm_ctl = ttk.Frame(self); frm_ctl.pack(fill="x", **pad) ttk.Button(frm_ctl, text="Start", command=self._on_start).pack(side="left", padx=4) ttk.Button(frm_ctl, text="Resume", command=self._on_resume).pack(side="left", padx=4) ttk.Button(frm_ctl, text="Stop", command=self._on_stop).pack(side="left", padx=4) ttk.Button(frm_ctl, text="Clear Log", command=self._clear_log).pack(side="left", padx=4) # Log frm_log = ttk.LabelFrame(self, text="Log"); frm_log.pack(fill="both", expand=True, **pad) self.txt_log = tk.Text(frm_log, wrap="word", height=22); self.txt_log.pack(fill="both", expand=True) self.lst_targets.bind("<>", lambda e: self._update_target_summary()) # --- UI helpers --- def _browse_input(self): if self.input_mode_var.get() == "file": p = filedialog.askopenfilename( title="Choose a file", filetypes=[("Supported","*.docx *.doc *.pptx *.xlsx *.xls *.pdf"), ("All files","*.*")] ) else: p = filedialog.askdirectory(title="Choose a folder") if p: self.in_path_var.set(p) def _browse_output(self): p = filedialog.askdirectory(title="Choose output folder") if p: self.out_dir_var.set(p) def _log(self, s: str): self.log_queue.put(s) def _drain_log_queue(self): try: while True: s = self.log_queue.get_nowait() self.txt_log.insert(tk.END, s + "\n"); self.txt_log.see(tk.END) except queue.Empty: pass self.after(120, self._drain_log_queue) def _collect_targets(self) -> List[str]: sel = set(self.lst_targets.curselection()) return [self.lst_targets.get(i) for i in range(self.lst_targets.size()) if i in sel] def _update_target_summary(self): tgts = self._collect_targets() self.sel_summary_var.set("Selected: " + (", ".join(tgts) if tgts else "(none)")) def _move_up(self): sel = list(self.lst_targets.curselection()) if not sel: return for idx in sel: if idx == 0: continue text = self.lst_targets.get(idx) self.lst_targets.delete(idx) self.lst_targets.insert(idx-1, text) self.lst_targets.selection_set(idx-1) self._update_target_summary() def _move_down(self): sel = list(self.lst_targets.curselection()) if not sel: return for idx in reversed(sel): if idx == self.lst_targets.size()-1: continue text = self.lst_targets.get(idx) self.lst_targets.delete(idx) self.lst_targets.insert(idx+1, text) self.lst_targets.selection_set(idx+1) self._update_target_summary() def _refresh_ollama_models(self): try: models = list_ollama_models() if models: self.cmb_ollama['values'] = models if self.ollama_model_var.get() not in models: self.ollama_model_var.set(models[0]) self._log(f"[Ollama] Models: {', '.join(models)}") else: self._log("[Ollama] No models found.") except Exception as e: self._log(f"[Ollama] List models failed: {e}") def _start_worker(self, resume: bool=False): base = self.base_url_var.get().strip().rstrip("/") key = self.api_key_var.get().strip() backend = self.backend_var.get().strip() if backend == 'Dify' and (not base or not key): messagebox.showerror("API", "Please set Dify Base URL and API Key."); return targets = self._collect_targets() if not targets: messagebox.showerror("Targets", "Please choose at least one target language."); return in_path = Path(self.in_path_var.get().strip()) if not in_path.exists(): messagebox.showerror("Input", "Input path does not exist."); return out_dir = Path(self.out_dir_var.get().strip() or DEFAULT_OUTPUT_DIR) out_dir.mkdir(parents=True, exist_ok=True) if self.cache is None: self.cache = TranslationCache(out_dir / "translation_cache.db") include_headers = bool(self.include_headers_var.get()) recurse = bool(self.recurse_var.get()) src_sel = self.src_lang_var.get().strip() src_lang = None if src_sel.lower() == "auto" else src_sel self._log(f"Targets (order): {', '.join(targets)}") self._log(f"Input: {in_path}") self._log(f"Output: {out_dir}") self._log(f"Include header/footer shapes via COM: {include_headers and _WIN32COM_AVAILABLE}") def work(): try: process_path(in_path, out_dir, targets, src_lang, base, key, self.cache, recurse=recurse, include_headers_shapes_via_com=include_headers, backend=backend, ollama_model=self.ollama_model_var.get(), log=self._log) except Exception as e: self._log(f"[Worker error] {e}") finally: self._log("Task finished.") if self.worker_thread and self.worker_thread.is_alive(): messagebox.showinfo("Running", "Task is already running."); return self.worker_thread = threading.Thread(target=work, daemon=True) self.worker_thread.start() def _on_start(self): self.txt_log.insert(tk.END, "== Start ==\n") self._start_worker(resume=False) def _on_resume(self): self.txt_log.insert(tk.END, "== Resume ==\n") self._start_worker(resume=True) def _on_stop(self): self._log("Stop requested (new files won't start).") def _clear_log(self): self.txt_log.delete("1.0", tk.END) def on_close(self): try: if self.cache: self.cache.close() except Exception: pass self.destroy() # ---------- Main ---------- def main(): app = TranslatorGUI() app.protocol("WM_DELETE_WINDOW", app.on_close) app.mainloop() if __name__ == "__main__": if len(sys.argv) == 1: main() else: load_api_config_from_file() if len(sys.argv) < 4: print("用法: python document_translator_gui.py <檔案或資料夾> <輸出資料夾> <目標語言以逗號分隔> [--headers]") sys.exit(1) inp = Path(sys.argv[1]); outd = Path(sys.argv[2]); tgts = [t.strip() for t in sys.argv[3].split(",")] include_headers = ("--headers" in sys.argv) outd.mkdir(parents=True, exist_ok=True) cache = TranslationCache(outd / "translation_cache.db") try: process_path(inp, outd, tgts, src_lang=None, base_url=DIFY_API_BASE_URL.strip().rstrip("/"), api_key=DIFY_API_KEY.strip(), cache=cache, recurse=True, include_headers_shapes_via_com=include_headers, log=lambda s: print(s)) finally: cache.close()