""" Tool_OCR - Translation Service Document translation using DIFY AI API with batch processing """ import json import logging import threading import time from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from app.schemas.translation import ( TranslatableItem, TranslatedItem, TranslationJobState, TranslationProgress, TranslationStatusEnum, ) from app.services.dify_client import ( DifyClient, DifyTranslationError, get_dify_client, MAX_BATCH_CHARS, MAX_BATCH_ITEMS, ) logger = logging.getLogger(__name__) # Element types that should be translated TRANSLATABLE_TEXT_TYPES = {'text', 'title', 'header', 'footer', 'paragraph', 'footnote'} TABLE_TYPE = 'table' SKIP_TYPES = {'page_number', 'image', 'chart', 'logo', 'reference'} @dataclass class TranslationBatch: """A batch of items to translate together""" items: List[TranslatableItem] = field(default_factory=list) total_chars: int = 0 def can_add(self, item: TranslatableItem) -> bool: """Check if item can be added to this batch""" item_chars = len(item.content) return ( len(self.items) < MAX_BATCH_ITEMS and self.total_chars + item_chars <= MAX_BATCH_CHARS ) def add(self, item: TranslatableItem): """Add item to batch""" self.items.append(item) self.total_chars += len(item.content) class TranslationService: """ Main translation service for document translation using DIFY AI. Features: - Extract translatable elements from UnifiedDocument - Batch translation via DIFY API (efficient) - Fallback to single-item translation for failures - Translation JSON generation - Progress tracking """ def __init__(self, dify_client: Optional[DifyClient] = None): self.dify_client = dify_client or get_dify_client() self._active_jobs: Dict[str, TranslationJobState] = {} self._jobs_lock = threading.Lock() self._total_tokens = 0 self._total_latency = 0.0 def extract_translatable_elements( self, result_json: Dict ) -> Tuple[List[TranslatableItem], int]: """ Extract all translatable elements from a result JSON. Args: result_json: UnifiedDocument JSON data Returns: Tuple of (list of TranslatableItem, total element count) """ items = [] total_elements = 0 for page in result_json.get('pages', []): page_number = page.get('page_number', 1) for elem in page.get('elements', []): total_elements += 1 elem_type = elem.get('type', '') elem_id = elem.get('element_id', '') content = elem.get('content') # Skip non-translatable types if elem_type in SKIP_TYPES: continue # Handle text elements if elem_type in TRANSLATABLE_TEXT_TYPES and isinstance(content, str): text = content.strip() if text: # Skip empty content items.append(TranslatableItem( element_id=elem_id, content=text, element_type=elem_type, page_number=page_number )) # Handle table elements elif elem_type == TABLE_TYPE and isinstance(content, dict): cells = content.get('cells', []) for cell in cells: cell_content = cell.get('content', '') if isinstance(cell_content, str) and cell_content.strip(): row = cell.get('row', 0) col = cell.get('col', 0) items.append(TranslatableItem( element_id=elem_id, content=cell_content.strip(), element_type='table_cell', page_number=page_number, cell_position=(row, col) )) logger.info( f"Extracted {len(items)} translatable items from {total_elements} elements" ) return items, total_elements def create_batches(self, items: List[TranslatableItem]) -> List[TranslationBatch]: """ Create translation batches from items based on character limits. Args: items: List of TranslatableItem Returns: List of TranslationBatch """ batches = [] current_batch = TranslationBatch() for item in items: if current_batch.can_add(item): current_batch.add(item) else: # Save current batch and start new one if current_batch.items: batches.append(current_batch) current_batch = TranslationBatch() current_batch.add(item) # Don't forget the last batch if current_batch.items: batches.append(current_batch) logger.info( f"Created {len(batches)} batches from {len(items)} items " f"(max {MAX_BATCH_CHARS} chars, max {MAX_BATCH_ITEMS} items per batch)" ) return batches def translate_batch( self, batch: TranslationBatch, target_lang: str, user_id: str ) -> List[TranslatedItem]: """ Translate a batch of items using DIFY API. Args: batch: TranslationBatch to translate target_lang: Target language code user_id: User identifier for tracking Returns: List of TranslatedItem """ if not batch.items: return [] # Extract texts in order texts = [item.content for item in batch.items] try: response = self.dify_client.translate_batch( texts=texts, target_lang=target_lang, user_id=user_id ) self._total_tokens += response.total_tokens self._total_latency += response.latency # Map translations back to items translated_items = [] for idx, item in enumerate(batch.items): marker_id = idx + 1 # Markers are 1-indexed if marker_id in response.translations: translated_content = response.translations[marker_id] else: # Missing translation - use original logger.warning(f"Missing translation for {item.element_id}, using original") translated_content = item.content translated_items.append(TranslatedItem( element_id=item.element_id, original_content=item.content, translated_content=translated_content, element_type=item.element_type, cell_position=item.cell_position )) return translated_items except DifyTranslationError as e: logger.error(f"Batch translation failed: {e}") # Return items with original content on failure return [ TranslatedItem( element_id=item.element_id, original_content=item.content, translated_content=item.content, # Keep original element_type=item.element_type, cell_position=item.cell_position ) for item in batch.items ] def translate_item( self, item: TranslatableItem, target_lang: str, user_id: str ) -> TranslatedItem: """ Translate a single item using DIFY API (fallback for batch failures). Args: item: TranslatableItem to translate target_lang: Target language code user_id: User identifier for tracking Returns: TranslatedItem with translation result """ try: response = self.dify_client.translate( text=item.content, target_lang=target_lang, user_id=user_id ) self._total_tokens += response.total_tokens self._total_latency += response.latency return TranslatedItem( element_id=item.element_id, original_content=item.content, translated_content=response.translated_text, element_type=item.element_type, cell_position=item.cell_position ) except DifyTranslationError as e: logger.error(f"Translation failed for {item.element_id}: {e}") # Return original content on failure return TranslatedItem( element_id=item.element_id, original_content=item.content, translated_content=item.content, # Keep original element_type=item.element_type, cell_position=item.cell_position ) def build_translation_result( self, translated_items: List[TranslatedItem], source_document: str, source_lang: str, target_lang: str, total_elements: int, processing_time: float, batch_count: int ) -> Dict: """ Build the translation result JSON structure. Args: translated_items: List of TranslatedItem source_document: Source document filename source_lang: Source language target_lang: Target language total_elements: Total elements in document processing_time: Processing time in seconds batch_count: Number of batches used Returns: Translation result dictionary """ # Build translations dict translations: Dict[str, Any] = {} total_chars = 0 for item in translated_items: total_chars += len(item.translated_content) if item.element_type == 'table_cell': # Group table cells by element_id if item.element_id not in translations: translations[item.element_id] = {'cells': []} translations[item.element_id]['cells'].append({ 'row': item.cell_position[0] if item.cell_position else 0, 'col': item.cell_position[1] if item.cell_position else 0, 'content': item.translated_content }) else: translations[item.element_id] = item.translated_content # Build statistics translated_element_ids = set(item.element_id for item in translated_items) skipped = total_elements - len(translated_element_ids) result = { 'schema_version': '1.0.0', 'source_document': source_document, 'source_lang': source_lang, 'target_lang': target_lang, 'provider': 'dify', 'translated_at': datetime.utcnow().isoformat() + 'Z', 'statistics': { 'total_elements': total_elements, 'translated_elements': len(translated_element_ids), 'skipped_elements': skipped, 'total_characters': total_chars, 'processing_time_seconds': round(processing_time, 2), 'total_tokens': self._total_tokens, 'batch_count': batch_count }, 'translations': translations } return result def translate_document( self, task_id: str, result_json_path: Path, target_lang: str, source_lang: str = 'auto', progress_callback: Optional[callable] = None ) -> Tuple[bool, Optional[Path], Optional[str]]: """ Translate a document using batch processing and save the result. Args: task_id: Task ID result_json_path: Path to source result.json target_lang: Target language (e.g., 'en', 'zh-TW') source_lang: Source language ('auto' for detection) progress_callback: Optional callback(progress: TranslationProgress) Returns: Tuple of (success, output_path, error_message) """ start_time = time.time() self._total_tokens = 0 self._total_latency = 0.0 logger.info( f"Starting translation: task_id={task_id}, target={target_lang}" ) try: # Load source JSON with open(result_json_path, 'r', encoding='utf-8') as f: result_json = json.load(f) source_document = result_json.get('metadata', {}).get('filename', 'unknown') # Extract translatable elements items, total_elements = self.extract_translatable_elements(result_json) if not items: logger.warning("No translatable elements found") return False, None, "No translatable elements found" # Create batches batches = self.create_batches(items) # Update initial progress if progress_callback: progress_callback(TranslationProgress( total_elements=len(items) )) # Translate each batch all_translated: List[TranslatedItem] = [] user_id = f"tool-ocr-{task_id}" processed_items = 0 for batch_idx, batch in enumerate(batches): logger.info( f"Translating batch {batch_idx + 1}/{len(batches)} " f"({len(batch.items)} items, {batch.total_chars} chars)" ) translated = self.translate_batch(batch, target_lang, user_id) all_translated.extend(translated) processed_items += len(batch.items) # Update progress if progress_callback: progress_callback(TranslationProgress( current_element=processed_items, total_elements=len(items), percentage=(processed_items / len(items)) * 100 )) # Build result processing_time = time.time() - start_time result = self.build_translation_result( translated_items=all_translated, source_document=source_document, source_lang=source_lang, target_lang=target_lang, total_elements=total_elements, processing_time=processing_time, batch_count=len(batches) ) # Save result output_filename = result_json_path.stem.replace('_result', '') output_path = result_json_path.parent / f"{output_filename}_translated_{target_lang}.json" with open(output_path, 'w', encoding='utf-8') as f: json.dump(result, f, ensure_ascii=False, indent=2) logger.info( f"Translation completed: {len(all_translated)} items in {len(batches)} batches, " f"{processing_time:.2f}s, {self._total_tokens} tokens, " f"saved to {output_path}" ) return True, output_path, None except Exception as e: logger.error(f"Translation failed: {e}") import traceback traceback.print_exc() return False, None, str(e) def get_job_state(self, task_id: str) -> Optional[TranslationJobState]: """Get the current state of a translation job""" with self._jobs_lock: return self._active_jobs.get(task_id) def set_job_state(self, task_id: str, state: TranslationJobState): """Set the state of a translation job""" with self._jobs_lock: self._active_jobs[task_id] = state def remove_job_state(self, task_id: str): """Remove a translation job state""" with self._jobs_lock: self._active_jobs.pop(task_id, None) # Global singleton _translation_service: Optional[TranslationService] = None def get_translation_service() -> TranslationService: """Get the global TranslationService instance""" global _translation_service if _translation_service is None: _translation_service = TranslationService() return _translation_service