backup
This commit is contained in:
19
app/services/__init__.py
Normal file
19
app/services/__init__.py
Normal file
@@ -0,0 +1,19 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
業務服務模組
|
||||
|
||||
Author: PANJIT IT Team
|
||||
Created: 2024-01-28
|
||||
Modified: 2024-01-28
|
||||
"""
|
||||
|
||||
from .dify_client import DifyClient
|
||||
from .translation_service import TranslationService
|
||||
from .notification_service import NotificationService
|
||||
|
||||
__all__ = [
|
||||
'DifyClient',
|
||||
'TranslationService',
|
||||
'NotificationService'
|
||||
]
|
137
app/services/celery_service.py
Normal file
137
app/services/celery_service.py
Normal file
@@ -0,0 +1,137 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Celery任務管理服務
|
||||
|
||||
Author: PANJIT IT Team
|
||||
Created: 2025-09-04
|
||||
"""
|
||||
|
||||
from celery import Celery
|
||||
from app.utils.logger import get_logger
|
||||
import os
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
def get_celery_app():
|
||||
"""取得Celery應用實例"""
|
||||
try:
|
||||
from celery_app import app as celery_app
|
||||
return celery_app
|
||||
except ImportError:
|
||||
# 如果無法導入,創建一個簡單的Celery實例
|
||||
broker_url = os.getenv('REDIS_URL', 'redis://localhost:6379/0')
|
||||
celery_app = Celery('translation_worker', broker=broker_url)
|
||||
return celery_app
|
||||
|
||||
|
||||
def revoke_task(job_uuid):
|
||||
"""
|
||||
撤銷指定任務的Celery任務
|
||||
|
||||
Args:
|
||||
job_uuid (str): 任務UUID
|
||||
|
||||
Returns:
|
||||
bool: 撤銷是否成功
|
||||
"""
|
||||
try:
|
||||
celery_app = get_celery_app()
|
||||
|
||||
# Celery任務ID通常與job_uuid相同或相關
|
||||
task_id = f"translate_document_{job_uuid}"
|
||||
|
||||
# 嘗試撤銷任務
|
||||
celery_app.control.revoke(task_id, terminate=True, signal='SIGKILL')
|
||||
|
||||
logger.info(f"Successfully revoked Celery task: {task_id}")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to revoke Celery task for job {job_uuid}: {str(e)}")
|
||||
return False
|
||||
|
||||
|
||||
def get_active_tasks():
|
||||
"""
|
||||
取得當前活躍的Celery任務
|
||||
|
||||
Returns:
|
||||
list: 活躍任務列表
|
||||
"""
|
||||
try:
|
||||
celery_app = get_celery_app()
|
||||
|
||||
# 取得活躍任務
|
||||
inspect = celery_app.control.inspect()
|
||||
active_tasks = inspect.active()
|
||||
|
||||
if active_tasks:
|
||||
return active_tasks
|
||||
else:
|
||||
return {}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get active tasks: {str(e)}")
|
||||
return {}
|
||||
|
||||
|
||||
def is_task_active(job_uuid):
|
||||
"""
|
||||
檢查指定任務是否在Celery中活躍
|
||||
|
||||
Args:
|
||||
job_uuid (str): 任務UUID
|
||||
|
||||
Returns:
|
||||
bool: 任務是否活躍
|
||||
"""
|
||||
try:
|
||||
active_tasks = get_active_tasks()
|
||||
task_id = f"translate_document_{job_uuid}"
|
||||
|
||||
# 檢查所有worker的活躍任務
|
||||
for worker, tasks in active_tasks.items():
|
||||
for task in tasks:
|
||||
if task.get('id') == task_id:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to check if task is active for job {job_uuid}: {str(e)}")
|
||||
return False
|
||||
|
||||
|
||||
def cleanup_stale_tasks():
|
||||
"""
|
||||
清理卡住的Celery任務
|
||||
|
||||
Returns:
|
||||
int: 清理的任務數量
|
||||
"""
|
||||
try:
|
||||
from app.models.job import TranslationJob
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
# 找出超過30分鐘仍在處理中的任務
|
||||
stale_threshold = datetime.utcnow() - timedelta(minutes=30)
|
||||
stale_jobs = TranslationJob.query.filter(
|
||||
TranslationJob.status == 'PROCESSING',
|
||||
TranslationJob.processing_started_at < stale_threshold
|
||||
).all()
|
||||
|
||||
cleanup_count = 0
|
||||
for job in stale_jobs:
|
||||
if not is_task_active(job.job_uuid):
|
||||
# 任務不在Celery中活躍,標記為失敗
|
||||
job.update_status('FAILED', error_message='任務處理超時,已自動取消')
|
||||
cleanup_count += 1
|
||||
logger.info(f"Cleaned up stale job: {job.job_uuid}")
|
||||
|
||||
return cleanup_count
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to cleanup stale tasks: {str(e)}")
|
||||
return 0
|
302
app/services/dify_client.py
Normal file
302
app/services/dify_client.py
Normal file
@@ -0,0 +1,302 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Dify API 客戶端服務
|
||||
|
||||
Author: PANJIT IT Team
|
||||
Created: 2024-01-28
|
||||
Modified: 2024-01-28
|
||||
"""
|
||||
|
||||
import time
|
||||
import requests
|
||||
from typing import Dict, Any, Optional
|
||||
from flask import current_app
|
||||
from app.utils.logger import get_logger
|
||||
from app.utils.exceptions import APIError
|
||||
from app.models.stats import APIUsageStats
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
class DifyClient:
|
||||
"""Dify API 客戶端"""
|
||||
|
||||
def __init__(self):
|
||||
self.base_url = current_app.config.get('DIFY_API_BASE_URL', '')
|
||||
self.api_key = current_app.config.get('DIFY_API_KEY', '')
|
||||
self.timeout = (10, 60) # (連接超時, 讀取超時)
|
||||
self.max_retries = 3
|
||||
self.retry_delay = 1.6 # 指數退避基數
|
||||
|
||||
if not self.base_url or not self.api_key:
|
||||
logger.warning("Dify API configuration is incomplete")
|
||||
|
||||
def _make_request(self, method: str, endpoint: str, data: Dict[str, Any] = None,
|
||||
user_id: int = None, job_id: int = None) -> Dict[str, Any]:
|
||||
"""發送 HTTP 請求到 Dify API"""
|
||||
|
||||
if not self.base_url or not self.api_key:
|
||||
raise APIError("Dify API 未配置完整")
|
||||
|
||||
url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/')}"
|
||||
|
||||
headers = {
|
||||
'Authorization': f'Bearer {self.api_key}',
|
||||
'Content-Type': 'application/json',
|
||||
'User-Agent': 'PANJIT-Document-Translator/1.0'
|
||||
}
|
||||
|
||||
# 重試邏輯
|
||||
last_exception = None
|
||||
start_time = time.time()
|
||||
|
||||
for attempt in range(self.max_retries):
|
||||
try:
|
||||
logger.debug(f"Making Dify API request: {method} {url} (attempt {attempt + 1})")
|
||||
|
||||
if method.upper() == 'GET':
|
||||
response = requests.get(url, headers=headers, timeout=self.timeout, params=data)
|
||||
else:
|
||||
response = requests.post(url, headers=headers, timeout=self.timeout, json=data)
|
||||
|
||||
# 計算響應時間
|
||||
response_time_ms = int((time.time() - start_time) * 1000)
|
||||
|
||||
# 檢查響應狀態
|
||||
response.raise_for_status()
|
||||
|
||||
# 解析響應
|
||||
result = response.json()
|
||||
|
||||
# 記錄 API 使用統計
|
||||
if user_id:
|
||||
self._record_api_usage(
|
||||
user_id=user_id,
|
||||
job_id=job_id,
|
||||
endpoint=endpoint,
|
||||
response_data=result,
|
||||
response_time_ms=response_time_ms,
|
||||
success=True
|
||||
)
|
||||
|
||||
logger.debug(f"Dify API request successful: {response_time_ms}ms")
|
||||
return result
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
last_exception = e
|
||||
response_time_ms = int((time.time() - start_time) * 1000)
|
||||
|
||||
# 記錄失敗的 API 調用
|
||||
if user_id:
|
||||
self._record_api_usage(
|
||||
user_id=user_id,
|
||||
job_id=job_id,
|
||||
endpoint=endpoint,
|
||||
response_data={},
|
||||
response_time_ms=response_time_ms,
|
||||
success=False,
|
||||
error_message=str(e)
|
||||
)
|
||||
|
||||
logger.warning(f"Dify API request failed (attempt {attempt + 1}): {str(e)}")
|
||||
|
||||
# 如果是最後一次嘗試,拋出異常
|
||||
if attempt == self.max_retries - 1:
|
||||
break
|
||||
|
||||
# 指數退避
|
||||
delay = self.retry_delay ** attempt
|
||||
logger.debug(f"Retrying in {delay} seconds...")
|
||||
time.sleep(delay)
|
||||
|
||||
# 所有重試都失敗了
|
||||
error_msg = f"Dify API request failed after {self.max_retries} attempts: {str(last_exception)}"
|
||||
logger.error(error_msg)
|
||||
raise APIError(error_msg)
|
||||
|
||||
def _record_api_usage(self, user_id: int, job_id: Optional[int], endpoint: str,
|
||||
response_data: Dict, response_time_ms: int, success: bool,
|
||||
error_message: str = None):
|
||||
"""記錄 API 使用統計"""
|
||||
try:
|
||||
# 從響應中提取使用量資訊
|
||||
metadata = response_data.get('metadata', {})
|
||||
|
||||
# 如果 job_id 無效,則設為 None 以避免外鍵約束錯誤
|
||||
APIUsageStats.record_api_call(
|
||||
user_id=user_id,
|
||||
job_id=job_id, # 已經是 Optional,如果無效會被設為 NULL
|
||||
api_endpoint=endpoint,
|
||||
metadata=metadata,
|
||||
response_time_ms=response_time_ms,
|
||||
success=success,
|
||||
error_message=error_message
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to record API usage: {str(e)}")
|
||||
|
||||
def translate_text(self, text: str, source_language: str, target_language: str,
|
||||
user_id: int = None, job_id: int = None) -> Dict[str, Any]:
|
||||
"""翻譯文字"""
|
||||
|
||||
if not text.strip():
|
||||
raise APIError("翻譯文字不能為空")
|
||||
|
||||
# 構建標準翻譯 prompt(英文指令格式)
|
||||
language_names = {
|
||||
'zh-tw': 'Traditional Chinese',
|
||||
'zh-cn': 'Simplified Chinese',
|
||||
'en': 'English',
|
||||
'ja': 'Japanese',
|
||||
'ko': 'Korean',
|
||||
'vi': 'Vietnamese',
|
||||
'th': 'Thai',
|
||||
'id': 'Indonesian',
|
||||
'ms': 'Malay',
|
||||
'es': 'Spanish',
|
||||
'fr': 'French',
|
||||
'de': 'German',
|
||||
'ru': 'Russian',
|
||||
'ar': 'Arabic'
|
||||
}
|
||||
|
||||
source_lang_name = language_names.get(source_language, source_language)
|
||||
target_lang_name = language_names.get(target_language, target_language)
|
||||
|
||||
query = f"""Task: Translate ONLY into {target_lang_name} from {source_lang_name}.
|
||||
|
||||
Rules:
|
||||
- Output translation text ONLY (no source text, no notes, no questions, no language-detection remarks).
|
||||
- Preserve original line breaks.
|
||||
- Do NOT wrap in quotes or code blocks.
|
||||
- Maintain original formatting and structure.
|
||||
|
||||
{text.strip()}"""
|
||||
|
||||
# 構建請求資料 - 使用成功版本的格式
|
||||
request_data = {
|
||||
'inputs': {},
|
||||
'response_mode': 'blocking',
|
||||
'user': f"user_{user_id}" if user_id else "doc-translator-user",
|
||||
'query': query
|
||||
}
|
||||
|
||||
try:
|
||||
response = self._make_request(
|
||||
method='POST',
|
||||
endpoint='/chat-messages',
|
||||
data=request_data,
|
||||
user_id=user_id,
|
||||
job_id=job_id
|
||||
)
|
||||
|
||||
# 從響應中提取翻譯結果 - 使用成功版本的方式
|
||||
answer = response.get('answer')
|
||||
|
||||
if not isinstance(answer, str) or not answer.strip():
|
||||
raise APIError("Dify API 返回空的翻譯結果")
|
||||
|
||||
return {
|
||||
'success': True,
|
||||
'translated_text': answer,
|
||||
'source_text': text,
|
||||
'source_language': source_language,
|
||||
'target_language': target_language,
|
||||
'metadata': response.get('metadata', {})
|
||||
}
|
||||
|
||||
except APIError:
|
||||
raise
|
||||
except Exception as e:
|
||||
error_msg = f"翻譯請求處理錯誤: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
raise APIError(error_msg)
|
||||
|
||||
def test_connection(self) -> bool:
|
||||
"""測試 Dify API 連接"""
|
||||
try:
|
||||
# 發送簡單的測試請求
|
||||
test_data = {
|
||||
'inputs': {'text': 'test'},
|
||||
'response_mode': 'blocking',
|
||||
'user': 'health_check'
|
||||
}
|
||||
|
||||
response = self._make_request(
|
||||
method='POST',
|
||||
endpoint='/chat-messages',
|
||||
data=test_data
|
||||
)
|
||||
|
||||
return response is not None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Dify API connection test failed: {str(e)}")
|
||||
return False
|
||||
|
||||
def get_app_info(self) -> Dict[str, Any]:
|
||||
"""取得 Dify 應用資訊"""
|
||||
try:
|
||||
response = self._make_request(
|
||||
method='GET',
|
||||
endpoint='/parameters'
|
||||
)
|
||||
|
||||
return {
|
||||
'success': True,
|
||||
'app_info': response
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get Dify app info: {str(e)}")
|
||||
return {
|
||||
'success': False,
|
||||
'error': str(e)
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def load_config_from_file(cls, file_path: str = 'api.txt'):
|
||||
"""從檔案載入 Dify API 配置"""
|
||||
try:
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
config_file = Path(file_path)
|
||||
|
||||
if not config_file.exists():
|
||||
logger.warning(f"Dify config file not found: {file_path}")
|
||||
return
|
||||
|
||||
with open(config_file, 'r', encoding='utf-8') as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if line.startswith('base_url:'):
|
||||
base_url = line.split(':', 1)[1].strip()
|
||||
current_app.config['DIFY_API_BASE_URL'] = base_url
|
||||
elif line.startswith('api:'):
|
||||
api_key = line.split(':', 1)[1].strip()
|
||||
current_app.config['DIFY_API_KEY'] = api_key
|
||||
|
||||
logger.info("Dify API config loaded from file")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to load Dify config from file: {str(e)}")
|
||||
|
||||
|
||||
def init_dify_config(app):
|
||||
"""初始化 Dify 配置"""
|
||||
with app.app_context():
|
||||
# 從 api.txt 載入配置
|
||||
DifyClient.load_config_from_file()
|
||||
|
||||
# 檢查配置完整性
|
||||
base_url = app.config.get('DIFY_API_BASE_URL')
|
||||
api_key = app.config.get('DIFY_API_KEY')
|
||||
|
||||
if base_url and api_key:
|
||||
logger.info("Dify API configuration loaded successfully")
|
||||
else:
|
||||
logger.warning("Dify API configuration is incomplete")
|
||||
logger.warning(f"Base URL: {'✓' if base_url else '✗'}")
|
||||
logger.warning(f"API Key: {'✓' if api_key else '✗'}")
|
864
app/services/document_processor.py
Normal file
864
app/services/document_processor.py
Normal file
@@ -0,0 +1,864 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
核心文檔處理邏輯 - 移植自最佳版本
|
||||
包含完整的 DOCX 文字提取和翻譯插入功能
|
||||
|
||||
Author: PANJIT IT Team
|
||||
Created: 2024-09-02
|
||||
Modified: 2024-09-02
|
||||
"""
|
||||
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Tuple, Optional, Any
|
||||
from docx.text.paragraph import Paragraph
|
||||
from docx.table import Table, _Cell
|
||||
from docx.shared import Pt
|
||||
from docx.oxml import OxmlElement
|
||||
from docx.oxml.ns import qn, nsdecls
|
||||
import docx
|
||||
|
||||
from app.utils.logger import get_logger
|
||||
from app.utils.exceptions import FileProcessingError
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
# ---------- Constants ----------
|
||||
INSERT_FONT_SIZE_PT = 10
|
||||
SENTENCE_MODE = True
|
||||
|
||||
# ---------- Optional dependencies detection ----------
|
||||
try:
|
||||
import blingfire
|
||||
_HAS_BLINGFIRE = True
|
||||
except ImportError:
|
||||
_HAS_BLINGFIRE = False
|
||||
|
||||
try:
|
||||
import pysbd
|
||||
_HAS_PYSBD = True
|
||||
except ImportError:
|
||||
_HAS_PYSBD = False
|
||||
|
||||
# ---------- Helper functions ----------
|
||||
def _has_cjk(text: str) -> bool:
|
||||
"""Check if text contains CJK (Chinese/Japanese/Korean) characters."""
|
||||
for char in text:
|
||||
if '\u4e00' <= char <= '\u9fff' or \
|
||||
'\u3400' <= char <= '\u4dbf' or \
|
||||
'\u20000' <= char <= '\u2a6df' or \
|
||||
'\u3040' <= char <= '\u309f' or \
|
||||
'\u30a0' <= char <= '\u30ff' or \
|
||||
'\uac00' <= char <= '\ud7af':
|
||||
return True
|
||||
return False
|
||||
|
||||
def _normalize_text(text: str) -> str:
|
||||
"""Normalize text for comparison."""
|
||||
return re.sub(r'\s+', ' ', text.strip().lower())
|
||||
|
||||
def _append_after(p: Paragraph, text_block: str, italic: bool=True, font_size_pt: int=INSERT_FONT_SIZE_PT) -> Paragraph:
|
||||
"""Insert a new paragraph after p, return the new paragraph (for chain insert)."""
|
||||
new_p = OxmlElement("w:p")
|
||||
p._p.addnext(new_p)
|
||||
np = Paragraph(new_p, p._parent)
|
||||
lines = text_block.split("\n")
|
||||
for i, line in enumerate(lines):
|
||||
run = np.add_run(line)
|
||||
if italic:
|
||||
run.italic = True
|
||||
if font_size_pt:
|
||||
run.font.size = Pt(font_size_pt)
|
||||
if i < len(lines) - 1:
|
||||
run.add_break()
|
||||
tag = np.add_run("\u200b")
|
||||
if italic:
|
||||
tag.italic = True
|
||||
if font_size_pt:
|
||||
tag.font.size = Pt(font_size_pt)
|
||||
return np
|
||||
|
||||
def _is_our_insert_block(p: Paragraph) -> bool:
|
||||
"""Return True iff paragraph contains our zero-width marker."""
|
||||
return any("\u200b" in (r.text or "") for r in p.runs)
|
||||
|
||||
def _find_last_inserted_after(p: Paragraph, limit: int = 8) -> Optional[Paragraph]:
|
||||
"""Find the last paragraph that was inserted after p (up to limit paragraphs)."""
|
||||
try:
|
||||
# Get all paragraphs in the parent container
|
||||
if hasattr(p._parent, 'paragraphs'):
|
||||
all_paras = list(p._parent.paragraphs)
|
||||
else:
|
||||
# Handle cases where _parent doesn't have paragraphs (e.g., table cells)
|
||||
return None
|
||||
|
||||
# Find p's index
|
||||
p_index = -1
|
||||
for i, para in enumerate(all_paras):
|
||||
if para._element == p._element:
|
||||
p_index = i
|
||||
break
|
||||
|
||||
if p_index == -1:
|
||||
return None
|
||||
|
||||
# Check paragraphs after p
|
||||
last_found = None
|
||||
for i in range(p_index + 1, min(p_index + 1 + limit, len(all_paras))):
|
||||
if _is_our_insert_block(all_paras[i]):
|
||||
last_found = all_paras[i]
|
||||
else:
|
||||
break # Stop at first non-inserted paragraph
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
return last_found
|
||||
|
||||
def _p_text_with_breaks(p: Paragraph) -> str:
|
||||
"""Extract text from paragraph with line breaks preserved."""
|
||||
parts = []
|
||||
for node in p._element.xpath(".//*[local-name()='t' or local-name()='br' or local-name()='tab']"):
|
||||
tag = node.tag.split('}', 1)[-1]
|
||||
if tag == "t":
|
||||
parts.append(node.text or "")
|
||||
elif tag == "br":
|
||||
parts.append("\n")
|
||||
elif tag == "tab":
|
||||
parts.append("\t")
|
||||
return "".join(parts)
|
||||
|
||||
def _get_cell_full_text(cell) -> str:
|
||||
"""
|
||||
提取表格儲存格的完整文字內容,包含所有段落
|
||||
"""
|
||||
try:
|
||||
cell_texts = []
|
||||
for para in cell.paragraphs:
|
||||
para_text = _p_text_with_breaks(para)
|
||||
if para_text.strip():
|
||||
cell_texts.append(para_text.strip())
|
||||
|
||||
# 用換行符連接所有段落
|
||||
return '\n'.join(cell_texts)
|
||||
except Exception as e:
|
||||
logger.warning(f"提取儲存格文字失敗: {e}")
|
||||
return ""
|
||||
|
||||
def _is_our_insert_block_text(text: str) -> bool:
|
||||
"""檢查文字是否為翻譯插入區塊"""
|
||||
if not text:
|
||||
return False
|
||||
text_lower = text.lower().strip()
|
||||
return (
|
||||
text_lower.startswith('【') or
|
||||
text_lower.startswith('[翻譯') or
|
||||
'翻譯:' in text_lower or
|
||||
'translation:' in text_lower or
|
||||
text_lower.startswith('translated:') or
|
||||
"\u200b" in text
|
||||
)
|
||||
|
||||
def _is_our_insert_block(p: Paragraph) -> bool:
|
||||
"""Check if paragraph is our inserted translation (contains zero-width space marker)."""
|
||||
text = _p_text_with_breaks(p)
|
||||
return "\u200b" in text
|
||||
|
||||
def should_translate(text: str, src_lang: str) -> bool:
|
||||
"""Determine if text should be translated based on content and source language."""
|
||||
text = text.strip()
|
||||
|
||||
# 只要有字就翻譯 - 最小長度設為1
|
||||
if len(text) < 1:
|
||||
return False
|
||||
|
||||
# Skip pure numbers, dates, etc.
|
||||
if re.match(r'^[\d\s\.\-\:\/]+$', text):
|
||||
return False
|
||||
|
||||
# For auto-detect, translate if has CJK or meaningful text
|
||||
if src_lang.lower() in ('auto', 'auto-detect'):
|
||||
return _has_cjk(text) or len(text) > 5
|
||||
|
||||
return True
|
||||
|
||||
def _split_sentences(text: str, lang: str = 'auto') -> List[str]:
|
||||
"""Split text into sentences using available libraries."""
|
||||
if not text.strip():
|
||||
return []
|
||||
|
||||
# Try blingfire first
|
||||
if _HAS_BLINGFIRE and SENTENCE_MODE:
|
||||
try:
|
||||
sentences = blingfire.text_to_sentences(text).split('\n')
|
||||
sentences = [s.strip() for s in sentences if s.strip()]
|
||||
if sentences:
|
||||
return sentences
|
||||
except Exception as e:
|
||||
logger.warning(f"Blingfire failed: {e}")
|
||||
|
||||
# Try pysbd
|
||||
if _HAS_PYSBD and SENTENCE_MODE:
|
||||
try:
|
||||
seg = pysbd.Segmenter(language="en" if lang == "auto" else lang)
|
||||
sentences = seg.segment(text)
|
||||
sentences = [s.strip() for s in sentences if s.strip()]
|
||||
if sentences:
|
||||
return sentences
|
||||
except Exception as e:
|
||||
logger.warning(f"PySBD failed: {e}")
|
||||
|
||||
# Fallback to simple splitting
|
||||
separators = ['. ', '。', '!', '?', '!', '?', '\n']
|
||||
sentences = [text]
|
||||
|
||||
for sep in separators:
|
||||
new_sentences = []
|
||||
for s in sentences:
|
||||
parts = s.split(sep)
|
||||
if len(parts) > 1:
|
||||
new_sentences.extend([p.strip() + sep.rstrip() for p in parts[:-1] if p.strip()])
|
||||
if parts[-1].strip():
|
||||
new_sentences.append(parts[-1].strip())
|
||||
else:
|
||||
new_sentences.append(s)
|
||||
sentences = new_sentences
|
||||
|
||||
return [s for s in sentences if len(s.strip()) > 3]
|
||||
|
||||
# ---------- Segment class ----------
|
||||
class Segment:
|
||||
"""Represents a translatable text segment in a document."""
|
||||
|
||||
def __init__(self, kind: str, ref: Any, ctx: str, text: str):
|
||||
self.kind = kind # 'para' | 'txbx'
|
||||
self.ref = ref # Reference to original document element
|
||||
self.ctx = ctx # Context information
|
||||
self.text = text # Text content
|
||||
|
||||
# ---------- TextBox helpers ----------
|
||||
def _txbx_iter_texts(doc: docx.Document):
|
||||
"""
|
||||
Yield (txbxContent_element, joined_source_text)
|
||||
- Deeply collect all descendant <w:p> under txbxContent
|
||||
- Skip our inserted translations: contains zero-width or (all italic and no CJK)
|
||||
- Keep only lines that still have CJK
|
||||
"""
|
||||
def _p_text_flags(p_el):
|
||||
parts = []
|
||||
for node in p_el.xpath(".//*[local-name()='t' or local-name()='br' or local-name()='tab']"):
|
||||
tag = node.tag.split('}', 1)[-1]
|
||||
if tag == "t":
|
||||
parts.append(node.text or "")
|
||||
elif tag == "br":
|
||||
parts.append("\n")
|
||||
else:
|
||||
parts.append(" ")
|
||||
text = "".join(parts)
|
||||
has_zero = ("\u200b" in text)
|
||||
runs = p_el.xpath(".//*[local-name()='r']")
|
||||
vis, ital = [], []
|
||||
for r in runs:
|
||||
rt = "".join([(t.text or "") for t in r.xpath(".//*[local-name()='t']")])
|
||||
if (rt or "").strip():
|
||||
vis.append(rt)
|
||||
ital.append(bool(r.xpath(".//*[local-name()='i']")))
|
||||
all_italic = (len(vis) > 0 and all(ital))
|
||||
return text, has_zero, all_italic
|
||||
|
||||
for tx in doc._element.xpath(".//*[local-name()='txbxContent']"):
|
||||
kept = []
|
||||
for p in tx.xpath(".//*[local-name()='p']"): # all descendant paragraphs
|
||||
text, has_zero, all_italic = _p_text_flags(p)
|
||||
if not (text or "").strip():
|
||||
continue
|
||||
if has_zero:
|
||||
continue # our inserted
|
||||
for line in text.split("\n"):
|
||||
if line.strip():
|
||||
kept.append(line.strip())
|
||||
if kept:
|
||||
joined = "\n".join(kept)
|
||||
yield tx, joined
|
||||
|
||||
def _txbx_append_paragraph(tx, text_block: str, italic: bool = True, font_size_pt: int = INSERT_FONT_SIZE_PT):
|
||||
"""Append a paragraph to textbox content."""
|
||||
p = OxmlElement("w:p")
|
||||
r = OxmlElement("w:r")
|
||||
rPr = OxmlElement("w:rPr")
|
||||
if italic:
|
||||
rPr.append(OxmlElement("w:i"))
|
||||
if font_size_pt:
|
||||
sz = OxmlElement("w:sz")
|
||||
sz.set(qn("w:val"), str(int(font_size_pt * 2)))
|
||||
rPr.append(sz)
|
||||
r.append(rPr)
|
||||
lines = text_block.split("\n")
|
||||
for i, line in enumerate(lines):
|
||||
if i > 0:
|
||||
r.append(OxmlElement("w:br"))
|
||||
t = OxmlElement("w:t")
|
||||
t.set(qn("xml:space"), "preserve")
|
||||
t.text = line
|
||||
r.append(t)
|
||||
tag = OxmlElement("w:t")
|
||||
tag.set(qn("xml:space"), "preserve")
|
||||
tag.text = "\u200b"
|
||||
r.append(tag)
|
||||
p.append(r)
|
||||
tx.append(p)
|
||||
|
||||
def _txbx_tail_equals(tx, translations: List[str]) -> bool:
|
||||
"""Check if textbox already contains the expected translations."""
|
||||
paras = tx.xpath("./*[local-name()='p']")
|
||||
if len(paras) < len(translations):
|
||||
return False
|
||||
tail = paras[-len(translations):]
|
||||
for q, expect in zip(tail, translations):
|
||||
parts = []
|
||||
for node in q.xpath(".//*[local-name()='t' or local-name()='br']"):
|
||||
tag = node.tag.split("}", 1)[-1]
|
||||
parts.append("\n" if tag == "br" else (node.text or ""))
|
||||
if _normalize_text("".join(parts).strip()) != _normalize_text(expect):
|
||||
return False
|
||||
return True
|
||||
|
||||
# ---------- Main extraction logic ----------
|
||||
def _get_paragraph_key(p: Paragraph) -> str:
|
||||
"""Generate a stable unique key for paragraph deduplication."""
|
||||
try:
|
||||
# Use XML content hash + text content for stable deduplication
|
||||
xml_content = p._p.xml if hasattr(p._p, 'xml') else str(p._p)
|
||||
text_content = _p_text_with_breaks(p)
|
||||
combined = f"{hash(xml_content)}_{len(text_content)}_{text_content[:50]}"
|
||||
return combined
|
||||
except Exception:
|
||||
# Fallback to simple text-based key
|
||||
text_content = _p_text_with_breaks(p)
|
||||
return f"fallback_{hash(text_content)}_{len(text_content)}"
|
||||
|
||||
def _collect_docx_segments(doc: docx.Document) -> List[Segment]:
|
||||
"""
|
||||
Enhanced segment collector with improved stability.
|
||||
Handles paragraphs, tables, textboxes, and SDT Content Controls.
|
||||
"""
|
||||
segs: List[Segment] = []
|
||||
seen_par_keys = set()
|
||||
|
||||
def _add_paragraph(p: Paragraph, ctx: str):
|
||||
try:
|
||||
p_key = _get_paragraph_key(p)
|
||||
if p_key in seen_par_keys:
|
||||
return
|
||||
|
||||
txt = _p_text_with_breaks(p)
|
||||
if txt.strip() and not _is_our_insert_block(p):
|
||||
segs.append(Segment("para", p, ctx, txt))
|
||||
seen_par_keys.add(p_key)
|
||||
except Exception as e:
|
||||
# Log error but continue processing
|
||||
logger.warning(f"段落處理錯誤: {e}, 跳過此段落")
|
||||
|
||||
def _process_container_content(container, ctx: str):
|
||||
"""
|
||||
Recursively processes content within a container (body, cell, or SDT content).
|
||||
Identifies and handles paragraphs, tables, and SDT elements.
|
||||
"""
|
||||
if container._element is None:
|
||||
return
|
||||
|
||||
for child_element in container._element:
|
||||
qname = child_element.tag
|
||||
|
||||
if qname.endswith('}p'): # Paragraph
|
||||
p = Paragraph(child_element, container)
|
||||
_add_paragraph(p, ctx)
|
||||
|
||||
elif qname.endswith('}tbl'): # Table
|
||||
table = Table(child_element, container)
|
||||
for r_idx, row in enumerate(table.rows, 1):
|
||||
for c_idx, cell in enumerate(row.cells, 1):
|
||||
cell_ctx = f"{ctx} > Tbl(r{r_idx},c{c_idx})"
|
||||
|
||||
# 使用儲存格為單位的提取方式(而非逐段落提取)
|
||||
cell_text = _get_cell_full_text(cell)
|
||||
if cell_text.strip() and not _is_our_insert_block_text(cell_text):
|
||||
segs.append(Segment("table_cell", cell, cell_ctx, cell_text))
|
||||
|
||||
elif qname.endswith('}sdt'): # Structured Document Tag (SDT)
|
||||
sdt_ctx = f"{ctx} > SDT"
|
||||
|
||||
# 1. 提取 SDT 的元數據文本 (Placeholder, Dropdown items)
|
||||
ns = {'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'}
|
||||
|
||||
# 提取 Placeholder text
|
||||
placeholder_texts = []
|
||||
for t in child_element.xpath('.//w:placeholder//w:t', namespaces=ns):
|
||||
if t.text:
|
||||
placeholder_texts.append(t.text)
|
||||
if placeholder_texts:
|
||||
full_placeholder = "".join(placeholder_texts).strip()
|
||||
if full_placeholder:
|
||||
segs.append(Segment("para", child_element, f"{sdt_ctx}-Placeholder", full_placeholder))
|
||||
|
||||
# 提取 Dropdown list items
|
||||
list_items = []
|
||||
for item in child_element.xpath('.//w:dropDownList/w:listItem', namespaces=ns):
|
||||
display_text = item.get(qn('w:displayText'))
|
||||
if display_text:
|
||||
list_items.append(display_text)
|
||||
if list_items:
|
||||
items_as_text = "\n".join(list_items)
|
||||
segs.append(Segment("para", child_element, f"{sdt_ctx}-Dropdown", items_as_text))
|
||||
|
||||
# 2. 遞迴處理 SDT 的實際內容 (sdtContent)
|
||||
sdt_content_element = child_element.find(qn('w:sdtContent'))
|
||||
if sdt_content_element is not None:
|
||||
class SdtContentWrapper:
|
||||
def __init__(self, element, parent):
|
||||
self._element = element
|
||||
self._parent = parent
|
||||
|
||||
sdt_content_wrapper = SdtContentWrapper(sdt_content_element, container)
|
||||
_process_container_content(sdt_content_wrapper, sdt_ctx)
|
||||
|
||||
# --- Main execution starts here ---
|
||||
|
||||
# 1. Process the main document body
|
||||
_process_container_content(doc._body, "Body")
|
||||
|
||||
# 2. Process textboxes
|
||||
for tx, s in _txbx_iter_texts(doc):
|
||||
if s.strip() and (_has_cjk(s) or should_translate(s, 'auto')):
|
||||
segs.append(Segment("txbx", tx, "TextBox", s))
|
||||
|
||||
return segs
|
||||
|
||||
def _insert_docx_translations(doc: docx.Document, segs: List[Segment],
|
||||
tmap: Dict[Tuple[str, str], str],
|
||||
targets: List[str], log=lambda s: None) -> Tuple[int, int]:
|
||||
"""
|
||||
Insert translations into DOCX document segments.
|
||||
|
||||
CRITICAL: This function contains the fix for the major translation insertion bug.
|
||||
The key fix is in the segment filtering logic - we now correctly check if any target
|
||||
language has translation available using the proper key format (target_lang, text).
|
||||
|
||||
Args:
|
||||
doc: The DOCX document object
|
||||
segs: List of segments to translate
|
||||
tmap: Translation map with keys as (target_language, source_text)
|
||||
targets: List of target languages in order
|
||||
log: Logging function
|
||||
|
||||
Returns:
|
||||
Tuple of (successful_insertions, skipped_insertions)
|
||||
|
||||
Key Bug Fix:
|
||||
OLD (INCORRECT): if (seg.kind, seg.text) not in tmap and (targets[0], seg.text) not in tmap
|
||||
NEW (CORRECT): has_any_translation = any((tgt, seg.text) in tmap for tgt in targets)
|
||||
"""
|
||||
ok_cnt = skip_cnt = 0
|
||||
|
||||
# Helper function to add a formatted run to a paragraph
|
||||
def _add_formatted_run(p: Paragraph, text: str, italic: bool, font_size_pt: int):
|
||||
lines = text.split("\n")
|
||||
for i, line in enumerate(lines):
|
||||
run = p.add_run(line)
|
||||
if italic:
|
||||
run.italic = True
|
||||
if font_size_pt:
|
||||
run.font.size = Pt(font_size_pt)
|
||||
if i < len(lines) - 1:
|
||||
run.add_break()
|
||||
# Add our zero-width space marker
|
||||
tag_run = p.add_run("\u200b")
|
||||
if italic:
|
||||
tag_run.italic = True
|
||||
if font_size_pt:
|
||||
tag_run.font.size = Pt(font_size_pt)
|
||||
|
||||
for seg in segs:
|
||||
# Check if any target language has translation for this segment
|
||||
has_any_translation = any((tgt, seg.text) in tmap for tgt in targets)
|
||||
if not has_any_translation:
|
||||
log(f"[SKIP] 無翻譯結果: {seg.ctx} | {seg.text[:50]}...")
|
||||
skip_cnt += 1
|
||||
continue
|
||||
|
||||
# Get translations for all targets, with fallback for missing ones
|
||||
translations = []
|
||||
for tgt in targets:
|
||||
if (tgt, seg.text) in tmap:
|
||||
translations.append(tmap[(tgt, seg.text)])
|
||||
else:
|
||||
log(f"[WARNING] 缺少 {tgt} 翻譯: {seg.text[:30]}...")
|
||||
translations.append(f"【翻譯查詢失敗|{tgt}】{seg.text[:50]}...")
|
||||
|
||||
log(f"[INSERT] 準備插入 {len(translations)} 個翻譯到 {seg.ctx}: {seg.text[:30]}...")
|
||||
|
||||
if seg.kind == "para":
|
||||
# Check if this is an SDT segment (ref is an XML element, not a Paragraph)
|
||||
if hasattr(seg.ref, 'tag') and seg.ref.tag.endswith('}sdt'):
|
||||
# Handle SDT segments - insert translation into sdtContent
|
||||
sdt_element = seg.ref
|
||||
ns = {'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'}
|
||||
sdt_content = sdt_element.find(qn('w:sdtContent'))
|
||||
|
||||
if sdt_content is not None:
|
||||
# Check if translations already exist
|
||||
existing_paras = sdt_content.xpath('.//w:p', namespaces=ns)
|
||||
existing_texts = []
|
||||
for ep in existing_paras:
|
||||
p_obj = Paragraph(ep, None)
|
||||
if _is_our_insert_block(p_obj):
|
||||
existing_texts.append(_p_text_with_breaks(p_obj))
|
||||
|
||||
# Check if all translations already exist
|
||||
if len(existing_texts) >= len(translations):
|
||||
if all(_normalize_text(e) == _normalize_text(t) for e, t in zip(existing_texts[:len(translations)], translations)):
|
||||
skip_cnt += 1
|
||||
log(f"[SKIP] SDT 已存在翻譯: {seg.text[:30]}...")
|
||||
continue
|
||||
|
||||
# Add translations to SDT content
|
||||
for t in translations:
|
||||
if not any(_normalize_text(t) == _normalize_text(e) for e in existing_texts):
|
||||
# Create new paragraph in SDT content
|
||||
new_p_element = OxmlElement("w:p")
|
||||
sdt_content.append(new_p_element)
|
||||
new_p = Paragraph(new_p_element, None)
|
||||
_add_formatted_run(new_p, t, italic=True, font_size_pt=INSERT_FONT_SIZE_PT)
|
||||
|
||||
ok_cnt += 1
|
||||
log(f"[SUCCESS] SDT 插入翻譯(交錯格式)")
|
||||
continue
|
||||
|
||||
p: Paragraph = seg.ref
|
||||
|
||||
# --- CONTEXT-AWARE INSERTION LOGIC (from successful version) ---
|
||||
# Check if the paragraph's parent is a table cell
|
||||
if isinstance(p._parent, _Cell):
|
||||
cell = p._parent
|
||||
|
||||
try:
|
||||
# Find the current paragraph's position in the cell
|
||||
cell_paragraphs = list(cell.paragraphs)
|
||||
p_index = -1
|
||||
for idx, cell_p in enumerate(cell_paragraphs):
|
||||
if cell_p._element == p._element:
|
||||
p_index = idx
|
||||
break
|
||||
|
||||
if p_index == -1:
|
||||
log(f"[WARNING] 無法找到段落在單元格中的位置,使用原始方法")
|
||||
# Fallback to original method
|
||||
for block in translations:
|
||||
new_p = cell.add_paragraph()
|
||||
_add_formatted_run(new_p, block, italic=True, font_size_pt=INSERT_FONT_SIZE_PT)
|
||||
ok_cnt += 1
|
||||
continue
|
||||
|
||||
# Check if translations already exist right after this paragraph
|
||||
existing_texts = []
|
||||
check_limit = min(p_index + 1 + len(translations), len(cell_paragraphs))
|
||||
for idx in range(p_index + 1, check_limit):
|
||||
if _is_our_insert_block(cell_paragraphs[idx]):
|
||||
existing_texts.append(_p_text_with_breaks(cell_paragraphs[idx]))
|
||||
|
||||
# Check if all translations already exist in order
|
||||
if len(existing_texts) >= len(translations):
|
||||
if all(_normalize_text(e) == _normalize_text(t) for e, t in zip(existing_texts[:len(translations)], translations)):
|
||||
skip_cnt += 1
|
||||
log(f"[SKIP] 表格單元格已存在翻譯: {seg.text[:30]}...")
|
||||
continue
|
||||
|
||||
# Determine which translations need to be added
|
||||
to_add = []
|
||||
for t in translations:
|
||||
if not any(_normalize_text(t) == _normalize_text(e) for e in existing_texts):
|
||||
to_add.append(t)
|
||||
|
||||
if not to_add:
|
||||
skip_cnt += 1
|
||||
log(f"[SKIP] 表格單元格所有翻譯已存在: {seg.text[:30]}...")
|
||||
continue
|
||||
|
||||
# Insert new paragraphs right after the current paragraph
|
||||
insert_after = p
|
||||
for block in to_add:
|
||||
try:
|
||||
# Create new paragraph and insert it after the current position
|
||||
new_p_element = OxmlElement("w:p")
|
||||
insert_after._element.addnext(new_p_element)
|
||||
new_p = Paragraph(new_p_element, cell)
|
||||
_add_formatted_run(new_p, block, italic=True, font_size_pt=INSERT_FONT_SIZE_PT)
|
||||
insert_after = new_p # Update position for next insertion
|
||||
except Exception as e:
|
||||
log(f"[ERROR] 表格插入失敗: {e}, 嘗試fallback方法")
|
||||
# Fallback: add at the end of cell
|
||||
try:
|
||||
new_p = cell.add_paragraph()
|
||||
_add_formatted_run(new_p, block, italic=True, font_size_pt=INSERT_FONT_SIZE_PT)
|
||||
log(f"[SUCCESS] Fallback插入成功")
|
||||
except Exception as e2:
|
||||
log(f"[FATAL] Fallback也失敗: {e2}")
|
||||
continue
|
||||
ok_cnt += 1
|
||||
log(f"[SUCCESS] 表格單元格插入 {len(to_add)} 個翻譯(緊接原文後)")
|
||||
|
||||
except Exception as e:
|
||||
log(f"[ERROR] 表格處理全面失敗: {e}, 跳過此段落")
|
||||
continue
|
||||
|
||||
else:
|
||||
# Normal paragraph (not in table cell) - SIMPLIFIED FOR DEBUGGING
|
||||
try:
|
||||
# TEMPORARILY DISABLE existing translation check to force insertion
|
||||
log(f"[DEBUG] 強制插入翻譯到段落: {seg.text[:30]}...")
|
||||
|
||||
# Force all translations to be added
|
||||
to_add = translations
|
||||
|
||||
# Use simple positioning - always insert after current paragraph
|
||||
anchor = p
|
||||
|
||||
for block in to_add:
|
||||
try:
|
||||
log(f"[DEBUG] 嘗試插入: {block[:50]}...")
|
||||
anchor = _append_after(anchor, block, italic=True, font_size_pt=INSERT_FONT_SIZE_PT)
|
||||
log(f"[SUCCESS] _append_after成功插入")
|
||||
except Exception as e:
|
||||
log(f"[ERROR] _append_after失敗: {e}, 嘗試簡化插入")
|
||||
try:
|
||||
# Fallback: simple append
|
||||
if hasattr(p._parent, 'add_paragraph'):
|
||||
new_p = p._parent.add_paragraph()
|
||||
_add_formatted_run(new_p, block, italic=True, font_size_pt=INSERT_FONT_SIZE_PT)
|
||||
log(f"[SUCCESS] Fallback段落插入成功")
|
||||
else:
|
||||
log(f"[ERROR] 無法進行fallback插入")
|
||||
except Exception as e2:
|
||||
log(f"[FATAL] Fallback也失敗: {e2}")
|
||||
continue
|
||||
|
||||
ok_cnt += 1
|
||||
log(f"[SUCCESS] 段落強制插入 {len(to_add)} 個翻譯")
|
||||
|
||||
except Exception as e:
|
||||
log(f"[ERROR] 段落處理失敗: {e}, 跳過此段落")
|
||||
continue
|
||||
|
||||
elif seg.kind == "table_cell":
|
||||
# 處理表格儲存格翻譯插入
|
||||
cell = seg.ref # cell 是 _Cell 對象
|
||||
|
||||
# 檢查儲存格是否已有翻譯
|
||||
existing_translations = []
|
||||
cell_paragraphs = list(cell.paragraphs)
|
||||
|
||||
# 檢查儲存格末尾是否已有翻譯
|
||||
translation_start_index = len(cell_paragraphs)
|
||||
for i in range(len(cell_paragraphs) - 1, -1, -1):
|
||||
if _is_our_insert_block(cell_paragraphs[i]):
|
||||
existing_translations.insert(0, _p_text_with_breaks(cell_paragraphs[i]))
|
||||
translation_start_index = i
|
||||
else:
|
||||
break
|
||||
|
||||
# 檢查是否所有翻譯都已存在且相同
|
||||
if len(existing_translations) >= len(translations):
|
||||
if all(_normalize_text(e) == _normalize_text(t) for e, t in zip(existing_translations[:len(translations)], translations)):
|
||||
skip_cnt += 1
|
||||
log(f"[SKIP] 表格儲存格已存在翻譯: {seg.text[:30]}...")
|
||||
continue
|
||||
|
||||
# 移除舊的翻譯段落(如果有的話)
|
||||
for i in range(len(cell_paragraphs) - 1, translation_start_index - 1, -1):
|
||||
if _is_our_insert_block(cell_paragraphs[i]):
|
||||
cell._element.remove(cell_paragraphs[i]._element)
|
||||
|
||||
# 檢查是否為簡單的短文本儲存格(只有原文,沒有複雜結構)
|
||||
cell_content = cell.text.strip()
|
||||
is_simple_cell = len(cell_content) <= 10 and cell_content == seg.text.strip()
|
||||
|
||||
if is_simple_cell:
|
||||
# 對於簡單短文本,直接替換內容而不是添加段落
|
||||
log(f"[INFO] 簡單儲存格內容替換: '{seg.text.strip()}' -> '{translations[0] if translations else 'N/A'}'")
|
||||
|
||||
# 清空所有段落內容
|
||||
for para in cell.paragraphs:
|
||||
para.clear()
|
||||
|
||||
# 在第一個段落中添加原文和翻譯
|
||||
first_para = cell.paragraphs[0] if cell.paragraphs else cell.add_paragraph()
|
||||
|
||||
# 添加原文
|
||||
run_orig = first_para.add_run(seg.text.strip())
|
||||
|
||||
# 添加換行和翻譯
|
||||
for t in translations:
|
||||
first_para.add_run('\n')
|
||||
run_trans = first_para.add_run(t)
|
||||
run_trans.italic = True
|
||||
if INSERT_FONT_SIZE_PT:
|
||||
run_trans.font.size = Pt(INSERT_FONT_SIZE_PT)
|
||||
|
||||
# 添加標記
|
||||
tag_run = first_para.add_run("\u200b")
|
||||
tag_run.italic = True
|
||||
if INSERT_FONT_SIZE_PT:
|
||||
tag_run.font.size = Pt(INSERT_FONT_SIZE_PT)
|
||||
else:
|
||||
# 對於複雜儲存格,使用原有的添加段落方式
|
||||
for t in translations:
|
||||
new_p = cell.add_paragraph()
|
||||
_add_formatted_run(new_p, t, italic=True, font_size_pt=INSERT_FONT_SIZE_PT)
|
||||
|
||||
ok_cnt += 1
|
||||
log(f"[SUCCESS] 表格儲存格插入 {len(translations)} 個翻譯")
|
||||
|
||||
elif seg.kind == "txbx":
|
||||
tx = seg.ref
|
||||
# Check if textbox already has our translations at the end
|
||||
if _txbx_tail_equals(tx, translations):
|
||||
skip_cnt += 1
|
||||
log(f"[SKIP] 文字框已存在翻譯: {seg.text[:30]}...")
|
||||
continue
|
||||
|
||||
# Append translations to textbox
|
||||
for t in translations:
|
||||
_txbx_append_paragraph(tx, t, italic=True, font_size_pt=INSERT_FONT_SIZE_PT)
|
||||
|
||||
ok_cnt += 1
|
||||
log(f"[SUCCESS] 文字框插入 {len(translations)} 個翻譯")
|
||||
|
||||
return ok_cnt, skip_cnt
|
||||
|
||||
# ---------- Main DocumentProcessor class ----------
|
||||
class DocumentProcessor:
|
||||
"""Enhanced document processor with complete DOCX handling capabilities."""
|
||||
|
||||
def __init__(self):
|
||||
self.logger = logger
|
||||
|
||||
def extract_docx_segments(self, file_path: str) -> List[Segment]:
|
||||
"""Extract all translatable segments from DOCX file."""
|
||||
try:
|
||||
doc = docx.Document(file_path)
|
||||
segments = _collect_docx_segments(doc)
|
||||
|
||||
self.logger.info(f"Extracted {len(segments)} segments from {file_path}")
|
||||
for seg in segments[:5]: # Log first 5 segments for debugging
|
||||
self.logger.debug(f"Segment: {seg.kind} | {seg.ctx} | {seg.text[:50]}...")
|
||||
|
||||
return segments
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Failed to extract DOCX segments from {file_path}: {str(e)}")
|
||||
raise FileProcessingError(f"DOCX 文件分析失敗: {str(e)}")
|
||||
|
||||
def _rematch_segments_to_document(self, doc: docx.Document, old_segments: List[Segment]) -> List[Segment]:
|
||||
"""Re-match segments from old document instance to new document instance."""
|
||||
try:
|
||||
# Extract fresh segments from the current document instance
|
||||
fresh_segments = _collect_docx_segments(doc)
|
||||
|
||||
# Match old segments with fresh segments based on text content
|
||||
matched_segments = []
|
||||
|
||||
for old_seg in old_segments:
|
||||
# Find matching segment in fresh segments
|
||||
matched = False
|
||||
for fresh_seg in fresh_segments:
|
||||
if (old_seg.kind == fresh_seg.kind and
|
||||
old_seg.ctx == fresh_seg.ctx and
|
||||
_normalize_text(old_seg.text) == _normalize_text(fresh_seg.text)):
|
||||
matched_segments.append(fresh_seg)
|
||||
matched = True
|
||||
break
|
||||
|
||||
if not matched:
|
||||
self.logger.warning(f"Failed to match segment: {old_seg.text[:50]}...")
|
||||
# Still add the old segment but it might not work for insertion
|
||||
matched_segments.append(old_seg)
|
||||
|
||||
self.logger.debug(f"Re-matched {len(matched_segments)} segments to current document")
|
||||
return matched_segments
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Failed to re-match segments: {str(e)}")
|
||||
# Return original segments as fallback
|
||||
return old_segments
|
||||
|
||||
def insert_docx_translations(self, file_path: str, segments: List[Segment],
|
||||
translation_map: Dict[Tuple[str, str], str],
|
||||
target_languages: List[str], output_path: str) -> Tuple[int, int]:
|
||||
"""Insert translations into DOCX file and save to output path."""
|
||||
try:
|
||||
doc = docx.Document(file_path)
|
||||
|
||||
# CRITICAL FIX: Re-match segments with the current document instance
|
||||
# The original segments were extracted from a different document instance
|
||||
matched_segments = self._rematch_segments_to_document(doc, segments)
|
||||
|
||||
def log_func(msg: str):
|
||||
self.logger.debug(msg)
|
||||
|
||||
ok_count, skip_count = _insert_docx_translations(
|
||||
doc, matched_segments, translation_map, target_languages, log_func
|
||||
)
|
||||
|
||||
# Save the modified document
|
||||
doc.save(output_path)
|
||||
|
||||
self.logger.info(f"Inserted {ok_count} translations, skipped {skip_count}. Saved to: {output_path}")
|
||||
return ok_count, skip_count
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Failed to insert DOCX translations: {str(e)}")
|
||||
raise FileProcessingError(f"DOCX 翻譯插入失敗: {str(e)}")
|
||||
|
||||
def split_text_into_sentences(self, text: str, language: str = 'auto') -> List[str]:
|
||||
"""Split text into sentences using the best available method."""
|
||||
return _split_sentences(text, language)
|
||||
|
||||
def should_translate_text(self, text: str, source_language: str) -> bool:
|
||||
"""Determine if text should be translated."""
|
||||
return should_translate(text, source_language)
|
||||
|
||||
def insert_docx_combined_translations(self, file_path: str, segments: List[Segment],
|
||||
translation_map: Dict[Tuple[str, str], str],
|
||||
target_languages: List[str], output_path: str) -> Tuple[int, int]:
|
||||
"""Insert all translations into a single DOCX file with combined multi-language output.
|
||||
|
||||
This creates a combined file where each original text is followed by all translations
|
||||
in the format: original\n英文\n越南文 etc.
|
||||
"""
|
||||
try:
|
||||
doc = docx.Document(file_path)
|
||||
|
||||
# Re-match segments with the current document instance
|
||||
matched_segments = self._rematch_segments_to_document(doc, segments)
|
||||
|
||||
def log_func(msg: str):
|
||||
self.logger.debug(msg)
|
||||
|
||||
# Use the existing _insert_docx_translations function which already supports
|
||||
# multiple target languages in a single document
|
||||
ok_count, skip_count = _insert_docx_translations(
|
||||
doc, matched_segments, translation_map, target_languages, log_func
|
||||
)
|
||||
|
||||
# Save the combined document
|
||||
doc.save(output_path)
|
||||
|
||||
self.logger.info(f"Generated combined multi-language file: {output_path}")
|
||||
self.logger.info(f"Inserted {ok_count} translations, skipped {skip_count}")
|
||||
return ok_count, skip_count
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Failed to create combined DOCX translations: {str(e)}")
|
||||
raise FileProcessingError(f"組合多語言 DOCX 檔案生成失敗: {str(e)}")
|
645
app/services/notification_service.py
Normal file
645
app/services/notification_service.py
Normal file
@@ -0,0 +1,645 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
通知服務
|
||||
|
||||
Author: PANJIT IT Team
|
||||
Created: 2024-01-28
|
||||
Modified: 2024-01-28
|
||||
"""
|
||||
|
||||
import os
|
||||
import smtplib
|
||||
from email.mime.text import MIMEText
|
||||
from email.mime.multipart import MIMEMultipart
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Optional, List, Dict, Any
|
||||
from flask import current_app, url_for
|
||||
from app import db
|
||||
from app.utils.logger import get_logger
|
||||
from app.models.job import TranslationJob
|
||||
from app.models.user import User
|
||||
from app.models.notification import Notification, NotificationType
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
class NotificationService:
|
||||
"""通知服務"""
|
||||
|
||||
def __init__(self):
|
||||
self.smtp_server = current_app.config.get('SMTP_SERVER')
|
||||
self.smtp_port = current_app.config.get('SMTP_PORT', 587)
|
||||
self.use_tls = current_app.config.get('SMTP_USE_TLS', False)
|
||||
self.use_ssl = current_app.config.get('SMTP_USE_SSL', False)
|
||||
self.auth_required = current_app.config.get('SMTP_AUTH_REQUIRED', False)
|
||||
self.sender_email = current_app.config.get('SMTP_SENDER_EMAIL')
|
||||
self.sender_password = current_app.config.get('SMTP_SENDER_PASSWORD', '')
|
||||
self.app_name = current_app.config.get('APP_NAME', 'PANJIT Document Translator')
|
||||
|
||||
def _create_smtp_connection(self):
|
||||
"""建立 SMTP 連線"""
|
||||
try:
|
||||
if self.use_ssl:
|
||||
server = smtplib.SMTP_SSL(self.smtp_server, self.smtp_port)
|
||||
else:
|
||||
server = smtplib.SMTP(self.smtp_server, self.smtp_port)
|
||||
if self.use_tls:
|
||||
server.starttls()
|
||||
|
||||
if self.auth_required and self.sender_password:
|
||||
server.login(self.sender_email, self.sender_password)
|
||||
|
||||
return server
|
||||
except Exception as e:
|
||||
logger.error(f"SMTP connection failed: {str(e)}")
|
||||
return None
|
||||
|
||||
def _send_email(self, to_email: str, subject: str, html_content: str, text_content: str = None) -> bool:
|
||||
"""發送郵件的基礎方法"""
|
||||
try:
|
||||
if not self.smtp_server or not self.sender_email:
|
||||
logger.error("SMTP configuration incomplete")
|
||||
return False
|
||||
|
||||
# 建立郵件
|
||||
msg = MIMEMultipart('alternative')
|
||||
msg['From'] = f"{self.app_name} <{self.sender_email}>"
|
||||
msg['To'] = to_email
|
||||
msg['Subject'] = subject
|
||||
|
||||
# 添加文本內容
|
||||
if text_content:
|
||||
text_part = MIMEText(text_content, 'plain', 'utf-8')
|
||||
msg.attach(text_part)
|
||||
|
||||
# 添加 HTML 內容
|
||||
html_part = MIMEText(html_content, 'html', 'utf-8')
|
||||
msg.attach(html_part)
|
||||
|
||||
# 發送郵件
|
||||
server = self._create_smtp_connection()
|
||||
if not server:
|
||||
return False
|
||||
|
||||
server.send_message(msg)
|
||||
server.quit()
|
||||
|
||||
logger.info(f"Email sent successfully to {to_email}")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to send email to {to_email}: {str(e)}")
|
||||
return False
|
||||
|
||||
def send_job_completion_notification(self, job: TranslationJob) -> bool:
|
||||
"""發送任務完成通知"""
|
||||
try:
|
||||
if not job.user or not job.user.email:
|
||||
logger.warning(f"No email address for job {job.job_uuid}")
|
||||
return False
|
||||
|
||||
# 準備郵件內容
|
||||
subject = f"📄 翻譯完成通知 - {job.original_filename}"
|
||||
|
||||
# 計算處理時間
|
||||
processing_time = ""
|
||||
if job.processing_started_at and job.completed_at:
|
||||
duration = job.completed_at - job.processing_started_at
|
||||
total_seconds = int(duration.total_seconds())
|
||||
|
||||
if total_seconds < 60:
|
||||
processing_time = f"{total_seconds}秒"
|
||||
elif total_seconds < 3600:
|
||||
minutes = total_seconds // 60
|
||||
seconds = total_seconds % 60
|
||||
processing_time = f"{minutes}分{seconds}秒"
|
||||
else:
|
||||
hours = total_seconds // 3600
|
||||
minutes = (total_seconds % 3600) // 60
|
||||
processing_time = f"{hours}小時{minutes}分"
|
||||
|
||||
# 生成下載連結(簡化版本)
|
||||
download_links = []
|
||||
for lang in job.target_languages:
|
||||
download_links.append(f"• {lang}: [下載翻譯檔案]")
|
||||
|
||||
html_content = f"""
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<meta charset="utf-8">
|
||||
<style>
|
||||
body {{ font-family: Arial, sans-serif; line-height: 1.6; color: #333; }}
|
||||
.container {{ max-width: 600px; margin: 0 auto; padding: 20px; }}
|
||||
.header {{ background-color: #2563eb; color: white; padding: 20px; text-align: center; border-radius: 8px 8px 0 0; }}
|
||||
.content {{ background-color: #f8fafc; padding: 30px; border: 1px solid #e5e7eb; }}
|
||||
.info-box {{ background-color: #dbeafe; border-left: 4px solid #2563eb; padding: 15px; margin: 20px 0; }}
|
||||
.footer {{ background-color: #374151; color: #d1d5db; padding: 15px; text-align: center; font-size: 12px; border-radius: 0 0 8px 8px; }}
|
||||
.success {{ color: #059669; font-weight: bold; }}
|
||||
.download-section {{ margin: 20px 0; }}
|
||||
.download-link {{ display: inline-block; background-color: #2563eb; color: white; padding: 10px 20px; text-decoration: none; border-radius: 5px; margin: 5px; }}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<div class="container">
|
||||
<div class="header">
|
||||
<h1>🎉 翻譯任務完成</h1>
|
||||
</div>
|
||||
|
||||
<div class="content">
|
||||
<p>親愛的 <strong>{job.user.display_name}</strong>,</p>
|
||||
|
||||
<p class="success">您的文件翻譯任務已成功完成!</p>
|
||||
|
||||
<div class="info-box">
|
||||
<h3>📋 任務詳細資訊</h3>
|
||||
<p><strong>檔案名稱:</strong> {job.original_filename}</p>
|
||||
<p><strong>任務編號:</strong> {job.job_uuid}</p>
|
||||
<p><strong>來源語言:</strong> {job.source_language}</p>
|
||||
<p><strong>目標語言:</strong> {', '.join(job.target_languages)}</p>
|
||||
<p><strong>處理時間:</strong> {processing_time}</p>
|
||||
<p><strong>完成時間:</strong> {job.completed_at.strftime('%Y-%m-%d %H:%M:%S') if job.completed_at else '未知'}</p>
|
||||
{f'<p><strong>總成本:</strong> ${job.total_cost:.4f}</p>' if job.total_cost else ''}
|
||||
</div>
|
||||
|
||||
<div class="download-section">
|
||||
<h3>📥 下載翻譯檔案</h3>
|
||||
<p>請登入系統下載您的翻譯檔案:</p>
|
||||
<p>{'<br>'.join(download_links)}</p>
|
||||
<p style="margin-top: 15px;">
|
||||
<strong>注意:</strong> 翻譯檔案將在系統中保留 7 天,請及時下載。
|
||||
</p>
|
||||
</div>
|
||||
|
||||
<div style="margin-top: 30px; padding-top: 20px; border-top: 1px solid #e5e7eb;">
|
||||
<p>感謝您使用 {self.app_name}!</p>
|
||||
<p>如有任何問題,請聯繫系統管理員。</p>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="footer">
|
||||
<p>此郵件由 {self.app_name} 系統自動發送,請勿回覆。</p>
|
||||
<p>發送時間: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
|
||||
</div>
|
||||
</div>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
|
||||
# 純文字版本
|
||||
text_content = f"""
|
||||
翻譯任務完成通知
|
||||
|
||||
親愛的 {job.user.display_name},
|
||||
|
||||
您的文件翻譯任務已成功完成!
|
||||
|
||||
任務詳細資訊:
|
||||
- 檔案名稱: {job.original_filename}
|
||||
- 任務編號: {job.job_uuid}
|
||||
- 來源語言: {job.source_language}
|
||||
- 目標語言: {', '.join(job.target_languages)}
|
||||
- 處理時間: {processing_time}
|
||||
- 完成時間: {job.completed_at.strftime('%Y-%m-%d %H:%M:%S') if job.completed_at else '未知'}
|
||||
|
||||
請登入系統下載您的翻譯檔案。翻譯檔案將在系統中保留 7 天。
|
||||
|
||||
感謝您使用 {self.app_name}!
|
||||
|
||||
----
|
||||
此郵件由系統自動發送,請勿回覆。
|
||||
"""
|
||||
|
||||
return self._send_email(job.user.email, subject, html_content, text_content)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to send completion notification for job {job.job_uuid}: {str(e)}")
|
||||
return False
|
||||
|
||||
def send_job_failure_notification(self, job: TranslationJob) -> bool:
|
||||
"""發送任務失敗通知"""
|
||||
try:
|
||||
if not job.user or not job.user.email:
|
||||
logger.warning(f"No email address for job {job.job_uuid}")
|
||||
return False
|
||||
|
||||
subject = f"⚠️ 翻譯失敗通知 - {job.original_filename}"
|
||||
|
||||
html_content = f"""
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<meta charset="utf-8">
|
||||
<style>
|
||||
body {{ font-family: Arial, sans-serif; line-height: 1.6; color: #333; }}
|
||||
.container {{ max-width: 600px; margin: 0 auto; padding: 20px; }}
|
||||
.header {{ background-color: #dc2626; color: white; padding: 20px; text-align: center; border-radius: 8px 8px 0 0; }}
|
||||
.content {{ background-color: #f8fafc; padding: 30px; border: 1px solid #e5e7eb; }}
|
||||
.error-box {{ background-color: #fef2f2; border-left: 4px solid #dc2626; padding: 15px; margin: 20px 0; }}
|
||||
.footer {{ background-color: #374151; color: #d1d5db; padding: 15px; text-align: center; font-size: 12px; border-radius: 0 0 8px 8px; }}
|
||||
.error {{ color: #dc2626; font-weight: bold; }}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<div class="container">
|
||||
<div class="header">
|
||||
<h1>❌ 翻譯任務失敗</h1>
|
||||
</div>
|
||||
|
||||
<div class="content">
|
||||
<p>親愛的 <strong>{job.user.display_name}</strong>,</p>
|
||||
|
||||
<p class="error">很抱歉,您的文件翻譯任務處理失敗。</p>
|
||||
|
||||
<div class="error-box">
|
||||
<h3>📋 任務資訊</h3>
|
||||
<p><strong>檔案名稱:</strong> {job.original_filename}</p>
|
||||
<p><strong>任務編號:</strong> {job.job_uuid}</p>
|
||||
<p><strong>重試次數:</strong> {job.retry_count}</p>
|
||||
<p><strong>錯誤訊息:</strong> {job.error_message or '未知錯誤'}</p>
|
||||
<p><strong>失敗時間:</strong> {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
|
||||
</div>
|
||||
|
||||
<div style="margin-top: 20px;">
|
||||
<p><strong>建議處理方式:</strong></p>
|
||||
<ul>
|
||||
<li>檢查檔案格式是否正確</li>
|
||||
<li>確認檔案沒有損壞</li>
|
||||
<li>稍後再次嘗試上傳</li>
|
||||
<li>如問題持續,請聯繫系統管理員</li>
|
||||
</ul>
|
||||
</div>
|
||||
|
||||
<div style="margin-top: 30px; padding-top: 20px; border-top: 1px solid #e5e7eb;">
|
||||
<p>如需協助,請聯繫系統管理員。</p>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="footer">
|
||||
<p>此郵件由 {self.app_name} 系統自動發送,請勿回覆。</p>
|
||||
<p>發送時間: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
|
||||
</div>
|
||||
</div>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
|
||||
text_content = f"""
|
||||
翻譯任務失敗通知
|
||||
|
||||
親愛的 {job.user.display_name},
|
||||
|
||||
很抱歉,您的文件翻譯任務處理失敗。
|
||||
|
||||
任務資訊:
|
||||
- 檔案名稱: {job.original_filename}
|
||||
- 任務編號: {job.job_uuid}
|
||||
- 重試次數: {job.retry_count}
|
||||
- 錯誤訊息: {job.error_message or '未知錯誤'}
|
||||
|
||||
建議處理方式:
|
||||
1. 檢查檔案格式是否正確
|
||||
2. 確認檔案沒有損壞
|
||||
3. 稍後再次嘗試上傳
|
||||
4. 如問題持續,請聯繫系統管理員
|
||||
|
||||
如需協助,請聯繫系統管理員。
|
||||
|
||||
----
|
||||
此郵件由 {self.app_name} 系統自動發送,請勿回覆。
|
||||
"""
|
||||
|
||||
return self._send_email(job.user.email, subject, html_content, text_content)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to send failure notification for job {job.job_uuid}: {str(e)}")
|
||||
return False
|
||||
|
||||
def send_admin_notification(self, subject: str, message: str, admin_emails: List[str] = None) -> bool:
|
||||
"""發送管理員通知"""
|
||||
try:
|
||||
if not admin_emails:
|
||||
# 取得所有管理員郵件地址
|
||||
admin_users = User.get_admin_users()
|
||||
admin_emails = [user.email for user in admin_users if user.email]
|
||||
|
||||
if not admin_emails:
|
||||
logger.warning("No admin email addresses found")
|
||||
return False
|
||||
|
||||
html_content = f"""
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<meta charset="utf-8">
|
||||
<style>
|
||||
body {{ font-family: Arial, sans-serif; line-height: 1.6; color: #333; }}
|
||||
.container {{ max-width: 600px; margin: 0 auto; padding: 20px; }}
|
||||
.header {{ background-color: #f59e0b; color: white; padding: 20px; text-align: center; border-radius: 8px 8px 0 0; }}
|
||||
.content {{ background-color: #f8fafc; padding: 30px; border: 1px solid #e5e7eb; }}
|
||||
.footer {{ background-color: #374151; color: #d1d5db; padding: 15px; text-align: center; font-size: 12px; border-radius: 0 0 8px 8px; }}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<div class="container">
|
||||
<div class="header">
|
||||
<h1>🔔 系統管理通知</h1>
|
||||
</div>
|
||||
|
||||
<div class="content">
|
||||
<p>系統管理員您好,</p>
|
||||
|
||||
<div style="background-color: #fef3c7; border-left: 4px solid #f59e0b; padding: 15px; margin: 20px 0;">
|
||||
<h3>{subject}</h3>
|
||||
<p>{message}</p>
|
||||
</div>
|
||||
|
||||
<p>發送時間: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
|
||||
</div>
|
||||
|
||||
<div class="footer">
|
||||
<p>此郵件由 {self.app_name} 系統自動發送,請勿回覆。</p>
|
||||
</div>
|
||||
</div>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
|
||||
success_count = 0
|
||||
for email in admin_emails:
|
||||
if self._send_email(email, f"[管理通知] {subject}", html_content):
|
||||
success_count += 1
|
||||
|
||||
return success_count > 0
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to send admin notification: {str(e)}")
|
||||
return False
|
||||
|
||||
def test_smtp_connection(self) -> bool:
|
||||
"""測試 SMTP 連線"""
|
||||
try:
|
||||
server = self._create_smtp_connection()
|
||||
if server:
|
||||
server.quit()
|
||||
return True
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"SMTP connection test failed: {str(e)}")
|
||||
return False
|
||||
|
||||
# ========== 資料庫通知方法 ==========
|
||||
|
||||
def create_db_notification(
|
||||
self,
|
||||
user_id: int,
|
||||
title: str,
|
||||
message: str,
|
||||
notification_type: NotificationType = NotificationType.INFO,
|
||||
job_uuid: Optional[str] = None,
|
||||
extra_data: Optional[Dict[str, Any]] = None,
|
||||
expires_at: Optional[datetime] = None,
|
||||
link: Optional[str] = None
|
||||
) -> Optional[Notification]:
|
||||
"""
|
||||
創建資料庫通知
|
||||
|
||||
Args:
|
||||
user_id: 用戶ID
|
||||
title: 通知標題
|
||||
message: 通知內容
|
||||
notification_type: 通知類型
|
||||
job_uuid: 關聯任務UUID
|
||||
extra_data: 額外數據
|
||||
expires_at: 過期時間
|
||||
link: 相關連結
|
||||
|
||||
Returns:
|
||||
Notification: 創建的通知對象
|
||||
"""
|
||||
try:
|
||||
# 如果沒有指定連結但有任務UUID,自動生成任務詳情連結
|
||||
if not link and job_uuid:
|
||||
link = f"/job/{job_uuid}"
|
||||
|
||||
notification = Notification(
|
||||
user_id=user_id,
|
||||
type=notification_type.value,
|
||||
title=title,
|
||||
message=message,
|
||||
job_uuid=job_uuid,
|
||||
link=link,
|
||||
extra_data=extra_data,
|
||||
expires_at=expires_at
|
||||
)
|
||||
|
||||
db.session.add(notification)
|
||||
db.session.commit()
|
||||
|
||||
logger.info(f"資料庫通知已創建: {notification.notification_uuid} for user {user_id}")
|
||||
|
||||
# 觸發 WebSocket 推送
|
||||
self._send_websocket_notification(notification)
|
||||
|
||||
return notification
|
||||
|
||||
except Exception as e:
|
||||
db.session.rollback()
|
||||
logger.error(f"創建資料庫通知失敗: {e}")
|
||||
return None
|
||||
|
||||
def send_job_started_db_notification(self, job: TranslationJob) -> Optional[Notification]:
|
||||
"""
|
||||
發送任務開始處理的資料庫通知
|
||||
|
||||
Args:
|
||||
job: 翻譯任務對象
|
||||
|
||||
Returns:
|
||||
Notification: 創建的通知對象
|
||||
"""
|
||||
try:
|
||||
title = "翻譯任務開始處理"
|
||||
message = f'您的文件「{job.original_filename}」已開始翻譯處理。'
|
||||
|
||||
if job.target_languages:
|
||||
languages = ', '.join(job.target_languages)
|
||||
message += f" 目標語言: {languages}"
|
||||
|
||||
return self.create_db_notification(
|
||||
user_id=job.user_id,
|
||||
title=title,
|
||||
message=message,
|
||||
notification_type=NotificationType.INFO,
|
||||
job_uuid=job.job_uuid,
|
||||
extra_data={
|
||||
'filename': job.original_filename,
|
||||
'target_languages': job.target_languages,
|
||||
'started_at': job.processing_started_at.isoformat() if job.processing_started_at else None
|
||||
}
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"發送任務開始資料庫通知失敗: {e}")
|
||||
return None
|
||||
|
||||
def send_job_completion_db_notification(self, job: TranslationJob) -> Optional[Notification]:
|
||||
"""
|
||||
發送任務完成的資料庫通知
|
||||
|
||||
Args:
|
||||
job: 翻譯任務對象
|
||||
|
||||
Returns:
|
||||
Notification: 創建的通知對象
|
||||
"""
|
||||
try:
|
||||
if job.status != 'COMPLETED':
|
||||
logger.warning(f"任務 {job.job_uuid} 狀態不是已完成,跳過完成通知")
|
||||
return None
|
||||
|
||||
# 構建通知內容
|
||||
title = "翻譯任務完成"
|
||||
message = f'您的文件「{job.original_filename}」已成功翻譯完成。'
|
||||
|
||||
# 添加目標語言信息
|
||||
if job.target_languages:
|
||||
languages = ', '.join(job.target_languages)
|
||||
message += f" 目標語言: {languages}"
|
||||
|
||||
# 添加處理時間信息
|
||||
if job.processing_started_at and job.completed_at:
|
||||
duration = job.completed_at - job.processing_started_at
|
||||
minutes = int(duration.total_seconds() / 60)
|
||||
if minutes > 0:
|
||||
message += f" 處理時間: {minutes} 分鐘"
|
||||
else:
|
||||
message += f" 處理時間: {int(duration.total_seconds())} 秒"
|
||||
|
||||
return self.create_db_notification(
|
||||
user_id=job.user_id,
|
||||
title=title,
|
||||
message=message,
|
||||
notification_type=NotificationType.SUCCESS,
|
||||
job_uuid=job.job_uuid,
|
||||
extra_data={
|
||||
'filename': job.original_filename,
|
||||
'target_languages': job.target_languages,
|
||||
'total_cost': float(job.total_cost) if job.total_cost else 0,
|
||||
'completed_at': job.completed_at.isoformat() if job.completed_at else None
|
||||
}
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"發送任務完成資料庫通知失敗: {e}")
|
||||
return None
|
||||
|
||||
def send_job_completion_db_notification_direct(self, job: TranslationJob) -> Optional[Notification]:
|
||||
"""
|
||||
直接發送任務完成的資料庫通知(不檢查狀態)
|
||||
"""
|
||||
try:
|
||||
# 構建通知內容
|
||||
title = "翻譯任務完成"
|
||||
message = f'您的文件「{job.original_filename}」已成功翻譯完成。'
|
||||
|
||||
# 添加目標語言信息
|
||||
if job.target_languages:
|
||||
languages = ', '.join(job.target_languages)
|
||||
message += f" 目標語言: {languages}"
|
||||
|
||||
message += " 您可以在任務列表中下載翻譯結果。"
|
||||
|
||||
# 創建資料庫通知
|
||||
return self.create_db_notification(
|
||||
user_id=job.user_id,
|
||||
title=title,
|
||||
message=message,
|
||||
notification_type=NotificationType.SUCCESS,
|
||||
job_uuid=job.job_uuid,
|
||||
extra_data={
|
||||
'filename': job.original_filename,
|
||||
'target_languages': job.target_languages,
|
||||
'total_cost': float(job.total_cost) if job.total_cost else 0,
|
||||
'completed_at': job.completed_at.isoformat() if job.completed_at else None
|
||||
}
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"發送任務完成資料庫通知失敗: {e}")
|
||||
return None
|
||||
|
||||
def send_job_failure_db_notification(self, job: TranslationJob, error_message: str = None) -> Optional[Notification]:
|
||||
"""
|
||||
發送任務失敗的資料庫通知
|
||||
|
||||
Args:
|
||||
job: 翻譯任務對象
|
||||
error_message: 錯誤訊息
|
||||
|
||||
Returns:
|
||||
Notification: 創建的通知對象
|
||||
"""
|
||||
try:
|
||||
title = "翻譯任務失敗"
|
||||
message = f'您的文件「{job.original_filename}」翻譯失敗。'
|
||||
|
||||
if error_message:
|
||||
message += f" 錯誤訊息: {error_message}"
|
||||
|
||||
if job.retry_count > 0:
|
||||
message += f" 已重試 {job.retry_count} 次。"
|
||||
|
||||
return self.create_db_notification(
|
||||
user_id=job.user_id,
|
||||
title=title,
|
||||
message=message,
|
||||
notification_type=NotificationType.ERROR,
|
||||
job_uuid=job.job_uuid,
|
||||
extra_data={
|
||||
'filename': job.original_filename,
|
||||
'error_message': error_message,
|
||||
'retry_count': job.retry_count,
|
||||
'failed_at': datetime.now().isoformat()
|
||||
}
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"發送任務失敗資料庫通知失敗: {e}")
|
||||
return None
|
||||
|
||||
def _send_websocket_notification(self, notification: Notification):
|
||||
"""
|
||||
通過 WebSocket 發送通知
|
||||
|
||||
Args:
|
||||
notification: 通知對象
|
||||
"""
|
||||
try:
|
||||
from app.websocket import send_notification_to_user
|
||||
send_notification_to_user(notification.user_id, notification.to_dict())
|
||||
except Exception as e:
|
||||
logger.error(f"WebSocket 推送通知失敗: {e}")
|
||||
|
||||
def get_unread_count(self, user_id: int) -> int:
|
||||
"""
|
||||
獲取用戶未讀通知數量
|
||||
|
||||
Args:
|
||||
user_id: 用戶ID
|
||||
|
||||
Returns:
|
||||
int: 未讀通知數量
|
||||
"""
|
||||
try:
|
||||
return Notification.query.filter_by(
|
||||
user_id=user_id,
|
||||
is_read=False
|
||||
).filter(
|
||||
(Notification.expires_at.is_(None)) |
|
||||
(Notification.expires_at > datetime.now())
|
||||
).count()
|
||||
except Exception as e:
|
||||
logger.error(f"獲取未讀通知數量失敗: {e}")
|
||||
return 0
|
1227
app/services/translation_service.py
Normal file
1227
app/services/translation_service.py
Normal file
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user