""" Tool_OCR - Translation Service Document translation using DIFY AI API with batch processing """ import json import logging import threading import time from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from app.schemas.translation import ( TranslatableItem, TranslatedItem, TranslationJobState, TranslationProgress, TranslationStatusEnum, ) from app.services.dify_client import ( DifyClient, DifyTranslationError, get_dify_client, MAX_BATCH_CHARS, MAX_BATCH_ITEMS, ) logger = logging.getLogger(__name__) # Element types that should be translated TRANSLATABLE_TEXT_TYPES = {'text', 'title', 'header', 'footer', 'paragraph', 'footnote'} TABLE_TYPE = 'table' SKIP_TYPES = {'page_number', 'image', 'chart', 'logo', 'reference'} def apply_translations( result_json: Dict, translations: Dict[str, Any] ) -> Dict: """ Apply translations to a result JSON document, creating a translated copy. This function merges translation data with the original document structure, replacing original content with translated content while preserving all other properties (bounding boxes, styles, etc.). Args: result_json: Original UnifiedDocument JSON data translations: Translation dict mapping element_id to translated content. For text elements: element_id -> translated_string For tables: element_id -> {"cells": [{"row": int, "col": int, "content": str}]} Returns: A deep copy of result_json with translations applied """ import copy translated_doc = copy.deepcopy(result_json) applied_count = 0 for page in translated_doc.get('pages', []): for elem in page.get('elements', []): elem_id = elem.get('element_id', '') elem_type = elem.get('type', '') if elem_id not in translations: continue translation = translations[elem_id] # Handle text elements (string translation) if isinstance(translation, str): if elem_type in TRANSLATABLE_TEXT_TYPES: elem['content'] = translation applied_count += 1 else: logger.warning( f"Translation for {elem_id} is string but element type is {elem_type}" ) # Handle table elements (cells translation) elif isinstance(translation, dict) and 'cells' in translation: if elem_type == TABLE_TYPE and isinstance(elem.get('content'), dict): _apply_table_translation(elem, translation) applied_count += 1 else: logger.warning( f"Translation for {elem_id} is table but element type is {elem_type}" ) logger.info(f"Applied {applied_count} translations to document") return translated_doc def _apply_table_translation( table_elem: Dict, translation: Dict[str, Any] ) -> None: """ Apply translation to a table element's cells. Args: table_elem: Table element dict with content.cells translation: Translation dict with 'cells' list """ content = table_elem.get('content', {}) original_cells = content.get('cells', []) if not original_cells: return # Build lookup for translated cells by (row, col) translated_cells = {} for cell in translation.get('cells', []): row = cell.get('row', 0) col = cell.get('col', 0) translated_cells[(row, col)] = cell.get('content', '') # Apply translations to matching cells for cell in original_cells: row = cell.get('row', 0) col = cell.get('col', 0) key = (row, col) if key in translated_cells: cell['content'] = translated_cells[key] def load_translation_json(translation_path: Path) -> Optional[Dict]: """ Load translation JSON file. Args: translation_path: Path to translation JSON file Returns: Translation JSON dict or None if file doesn't exist """ if not translation_path.exists(): return None try: with open(translation_path, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: logger.error(f"Failed to load translation JSON: {e}") return None def find_translation_file( result_dir: Path, target_lang: str ) -> Optional[Path]: """ Find translation file for a given language in result directory. Args: result_dir: Directory containing result files target_lang: Target language code (e.g., 'en', 'zh-TW') Returns: Path to translation file or None if not found """ # Look for *_translated_{lang}.json pattern pattern = f"*_translated_{target_lang}.json" matches = list(result_dir.glob(pattern)) if matches: return matches[0] return None def list_available_translations(result_dir: Path) -> List[str]: """ List all available translation languages for a result directory. Args: result_dir: Directory containing result files Returns: List of language codes with available translations """ languages = [] pattern = "*_translated_*.json" for path in result_dir.glob(pattern): # Extract language from filename: xxx_translated_{lang}.json stem = path.stem if '_translated_' in stem: lang = stem.split('_translated_')[-1] if lang: languages.append(lang) return languages @dataclass class TranslationBatch: """A batch of items to translate together""" items: List[TranslatableItem] = field(default_factory=list) total_chars: int = 0 def can_add(self, item: TranslatableItem) -> bool: """Check if item can be added to this batch""" item_chars = len(item.content) return ( len(self.items) < MAX_BATCH_ITEMS and self.total_chars + item_chars <= MAX_BATCH_CHARS ) def add(self, item: TranslatableItem): """Add item to batch""" self.items.append(item) self.total_chars += len(item.content) class TranslationService: """ Main translation service for document translation using DIFY AI. Features: - Extract translatable elements from UnifiedDocument - Batch translation via DIFY API (efficient) - Fallback to single-item translation for failures - Translation JSON generation - Progress tracking """ def __init__(self, dify_client: Optional[DifyClient] = None): self.dify_client = dify_client or get_dify_client() self._active_jobs: Dict[str, TranslationJobState] = {} self._jobs_lock = threading.Lock() self._total_tokens = 0 self._total_latency = 0.0 def extract_translatable_elements( self, result_json: Dict ) -> Tuple[List[TranslatableItem], int]: """ Extract all translatable elements from a result JSON. Args: result_json: UnifiedDocument JSON data Returns: Tuple of (list of TranslatableItem, total element count) """ items = [] total_elements = 0 for page in result_json.get('pages', []): page_number = page.get('page_number', 1) for elem in page.get('elements', []): total_elements += 1 elem_type = elem.get('type', '') elem_id = elem.get('element_id', '') content = elem.get('content') # Skip non-translatable types if elem_type in SKIP_TYPES: continue # Handle text elements if elem_type in TRANSLATABLE_TEXT_TYPES and isinstance(content, str): text = content.strip() if text: # Skip empty content items.append(TranslatableItem( element_id=elem_id, content=text, element_type=elem_type, page_number=page_number )) # Handle table elements elif elem_type == TABLE_TYPE and isinstance(content, dict): cells = content.get('cells', []) for cell in cells: cell_content = cell.get('content', '') if isinstance(cell_content, str) and cell_content.strip(): row = cell.get('row', 0) col = cell.get('col', 0) items.append(TranslatableItem( element_id=elem_id, content=cell_content.strip(), element_type='table_cell', page_number=page_number, cell_position=(row, col) )) logger.info( f"Extracted {len(items)} translatable items from {total_elements} elements" ) return items, total_elements def create_batches(self, items: List[TranslatableItem]) -> List[TranslationBatch]: """ Create translation batches from items based on character limits. Args: items: List of TranslatableItem Returns: List of TranslationBatch """ batches = [] current_batch = TranslationBatch() for item in items: if current_batch.can_add(item): current_batch.add(item) else: # Save current batch and start new one if current_batch.items: batches.append(current_batch) current_batch = TranslationBatch() current_batch.add(item) # Don't forget the last batch if current_batch.items: batches.append(current_batch) logger.info( f"Created {len(batches)} batches from {len(items)} items " f"(max {MAX_BATCH_CHARS} chars, max {MAX_BATCH_ITEMS} items per batch)" ) return batches def translate_batch( self, batch: TranslationBatch, target_lang: str, user_id: str ) -> List[TranslatedItem]: """ Translate a batch of items using DIFY API. Args: batch: TranslationBatch to translate target_lang: Target language code user_id: User identifier for tracking Returns: List of TranslatedItem """ if not batch.items: return [] # Extract texts in order texts = [item.content for item in batch.items] try: response = self.dify_client.translate_batch( texts=texts, target_lang=target_lang, user_id=user_id ) self._total_tokens += response.total_tokens self._total_latency += response.latency # Map translations back to items translated_items = [] for idx, item in enumerate(batch.items): marker_id = idx + 1 # Markers are 1-indexed if marker_id in response.translations: translated_content = response.translations[marker_id] else: # Missing translation - use original logger.warning(f"Missing translation for {item.element_id}, using original") translated_content = item.content translated_items.append(TranslatedItem( element_id=item.element_id, original_content=item.content, translated_content=translated_content, element_type=item.element_type, cell_position=item.cell_position )) return translated_items except DifyTranslationError as e: logger.error(f"Batch translation failed: {e}") # Return items with original content on failure return [ TranslatedItem( element_id=item.element_id, original_content=item.content, translated_content=item.content, # Keep original element_type=item.element_type, cell_position=item.cell_position ) for item in batch.items ] def translate_item( self, item: TranslatableItem, target_lang: str, user_id: str ) -> TranslatedItem: """ Translate a single item using DIFY API (fallback for batch failures). Args: item: TranslatableItem to translate target_lang: Target language code user_id: User identifier for tracking Returns: TranslatedItem with translation result """ try: response = self.dify_client.translate( text=item.content, target_lang=target_lang, user_id=user_id ) self._total_tokens += response.total_tokens self._total_latency += response.latency return TranslatedItem( element_id=item.element_id, original_content=item.content, translated_content=response.translated_text, element_type=item.element_type, cell_position=item.cell_position ) except DifyTranslationError as e: logger.error(f"Translation failed for {item.element_id}: {e}") # Return original content on failure return TranslatedItem( element_id=item.element_id, original_content=item.content, translated_content=item.content, # Keep original element_type=item.element_type, cell_position=item.cell_position ) def build_translation_result( self, translated_items: List[TranslatedItem], source_document: str, source_lang: str, target_lang: str, total_elements: int, processing_time: float, batch_count: int ) -> Dict: """ Build the translation result JSON structure. Args: translated_items: List of TranslatedItem source_document: Source document filename source_lang: Source language target_lang: Target language total_elements: Total elements in document processing_time: Processing time in seconds batch_count: Number of batches used Returns: Translation result dictionary """ # Build translations dict translations: Dict[str, Any] = {} total_chars = 0 for item in translated_items: total_chars += len(item.translated_content) if item.element_type == 'table_cell': # Group table cells by element_id if item.element_id not in translations: translations[item.element_id] = {'cells': []} translations[item.element_id]['cells'].append({ 'row': item.cell_position[0] if item.cell_position else 0, 'col': item.cell_position[1] if item.cell_position else 0, 'content': item.translated_content }) else: translations[item.element_id] = item.translated_content # Build statistics translated_element_ids = set(item.element_id for item in translated_items) skipped = total_elements - len(translated_element_ids) result = { 'schema_version': '1.0.0', 'source_document': source_document, 'source_lang': source_lang, 'target_lang': target_lang, 'provider': 'dify', 'translated_at': datetime.utcnow().isoformat() + 'Z', 'statistics': { 'total_elements': total_elements, 'translated_elements': len(translated_element_ids), 'skipped_elements': skipped, 'total_characters': total_chars, 'processing_time_seconds': round(processing_time, 2), 'total_tokens': self._total_tokens, 'batch_count': batch_count }, 'translations': translations } return result def translate_document( self, task_id: str, result_json_path: Path, target_lang: str, source_lang: str = 'auto', progress_callback: Optional[callable] = None ) -> Tuple[bool, Optional[Path], Optional[str]]: """ Translate a document using batch processing and save the result. Args: task_id: Task ID result_json_path: Path to source result.json target_lang: Target language (e.g., 'en', 'zh-TW') source_lang: Source language ('auto' for detection) progress_callback: Optional callback(progress: TranslationProgress) Returns: Tuple of (success, output_path, error_message) """ start_time = time.time() self._total_tokens = 0 self._total_latency = 0.0 logger.info( f"Starting translation: task_id={task_id}, target={target_lang}" ) try: # Load source JSON with open(result_json_path, 'r', encoding='utf-8') as f: result_json = json.load(f) source_document = result_json.get('metadata', {}).get('filename', 'unknown') # Extract translatable elements items, total_elements = self.extract_translatable_elements(result_json) if not items: logger.warning("No translatable elements found") return False, None, "No translatable elements found" # Create batches batches = self.create_batches(items) # Update initial progress if progress_callback: progress_callback(TranslationProgress( total_elements=len(items) )) # Translate each batch all_translated: List[TranslatedItem] = [] user_id = f"tool-ocr-{task_id}" processed_items = 0 for batch_idx, batch in enumerate(batches): logger.info( f"Translating batch {batch_idx + 1}/{len(batches)} " f"({len(batch.items)} items, {batch.total_chars} chars)" ) translated = self.translate_batch(batch, target_lang, user_id) all_translated.extend(translated) processed_items += len(batch.items) # Update progress if progress_callback: progress_callback(TranslationProgress( current_element=processed_items, total_elements=len(items), percentage=(processed_items / len(items)) * 100 )) # Build result processing_time = time.time() - start_time result = self.build_translation_result( translated_items=all_translated, source_document=source_document, source_lang=source_lang, target_lang=target_lang, total_elements=total_elements, processing_time=processing_time, batch_count=len(batches) ) # Save result output_filename = result_json_path.stem.replace('_result', '') output_path = result_json_path.parent / f"{output_filename}_translated_{target_lang}.json" with open(output_path, 'w', encoding='utf-8') as f: json.dump(result, f, ensure_ascii=False, indent=2) logger.info( f"Translation completed: {len(all_translated)} items in {len(batches)} batches, " f"{processing_time:.2f}s, {self._total_tokens} tokens, " f"saved to {output_path}" ) return True, output_path, None except Exception as e: logger.error(f"Translation failed: {e}") import traceback traceback.print_exc() return False, None, str(e) def get_job_state(self, task_id: str) -> Optional[TranslationJobState]: """Get the current state of a translation job""" with self._jobs_lock: return self._active_jobs.get(task_id) def set_job_state(self, task_id: str, state: TranslationJobState): """Set the state of a translation job""" with self._jobs_lock: self._active_jobs[task_id] = state def remove_job_state(self, task_id: str): """Remove a translation job state""" with self._jobs_lock: self._active_jobs.pop(task_id, None) # Global singleton _translation_service: Optional[TranslationService] = None def get_translation_service() -> TranslationService: """Get the global TranslationService instance""" global _translation_service if _translation_service is None: _translation_service = TranslationService() return _translation_service