""" Tool_OCR - Translation Router Handles document translation operations via DIFY AI API """ import logging import json from datetime import datetime from pathlib import Path from typing import Optional from fastapi import APIRouter, Depends, HTTPException, status, Query, BackgroundTasks from fastapi.responses import FileResponse, JSONResponse from sqlalchemy.orm import Session from app.core.deps import get_db, get_current_user from app.core.config import settings from app.models.user import User from app.models.task import Task, TaskStatus from app.schemas.translation import ( TranslationRequest, TranslationStartResponse, TranslationStatusResponse, TranslationStatusEnum, TranslationProgress, TranslationListResponse, TranslationListItem, TranslationStatistics, ) from app.services.task_service import task_service from app.services.dify_client import LANGUAGE_NAMES logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/v2/translate", tags=["Translation"]) def run_translation_task( task_id: str, task_db_id: int, target_lang: str, source_lang: str = "auto", user_id: int = None ): """ Background task to run document translation. Args: task_id: Task UUID string task_db_id: Task database ID (for verification) target_lang: Target language code source_lang: Source language code ('auto' for detection) user_id: User ID for logging translation statistics """ from app.core.database import SessionLocal from app.services.translation_service import get_translation_service from app.schemas.translation import TranslationJobState, TranslationProgress from app.models.translation_log import TranslationLog db = SessionLocal() translation_service = get_translation_service() try: logger.info(f"Starting translation for task {task_id} -> {target_lang}") # Get task to find result JSON path task = db.query(Task).filter(Task.task_id == task_id).first() if not task: logger.error(f"Task {task_id} not found") return if not task.result_json_path: logger.error(f"Task {task_id} has no result JSON") translation_service.set_job_state(task_id, TranslationJobState( task_id=task_id, target_lang=target_lang, source_lang=source_lang, status=TranslationStatusEnum.FAILED, progress=TranslationProgress(), error_message="No OCR result found", started_at=datetime.utcnow() )) return result_json_path = Path(task.result_json_path) if not result_json_path.exists(): logger.error(f"Result JSON not found: {result_json_path}") translation_service.set_job_state(task_id, TranslationJobState( task_id=task_id, target_lang=target_lang, source_lang=source_lang, status=TranslationStatusEnum.FAILED, progress=TranslationProgress(), error_message="Result file not found", started_at=datetime.utcnow() )) return # Update state to translating translation_service.set_job_state(task_id, TranslationJobState( task_id=task_id, target_lang=target_lang, source_lang=source_lang, status=TranslationStatusEnum.TRANSLATING, progress=TranslationProgress(), started_at=datetime.utcnow() )) # Progress callback def progress_callback(progress: TranslationProgress): current_state = translation_service.get_job_state(task_id) if current_state: current_state.status = TranslationStatusEnum.TRANSLATING current_state.progress = progress translation_service.set_job_state(task_id, current_state) # Run translation success, output_path, error_message = translation_service.translate_document( task_id=task_id, result_json_path=result_json_path, target_lang=target_lang, source_lang=source_lang, progress_callback=progress_callback ) if success: translation_service.set_job_state(task_id, TranslationJobState( task_id=task_id, target_lang=target_lang, source_lang=source_lang, status=TranslationStatusEnum.COMPLETED, progress=TranslationProgress(percentage=100.0), started_at=datetime.utcnow(), completed_at=datetime.utcnow(), result_file_path=str(output_path) if output_path else None )) logger.info(f"Translation completed for task {task_id}") # Log translation statistics to database if user_id and output_path: try: with open(output_path, 'r', encoding='utf-8') as f: translation_result = json.load(f) stats = translation_result.get('statistics', {}) total_tokens = stats.get('total_tokens', 0) # Use actual price from Dify API if available, otherwise calculate estimated cost actual_price = stats.get('total_price', 0.0) if actual_price > 0: estimated_cost = actual_price else: # Fallback: Calculate estimated cost based on token pricing estimated_cost = (total_tokens / 1_000_000) * settings.translation_cost_per_million_tokens translation_log = TranslationLog( user_id=user_id, task_id=task_id, target_lang=target_lang, source_lang=source_lang, total_tokens=total_tokens, input_tokens=0, # Dify doesn't provide separate input/output tokens output_tokens=0, total_elements=stats.get('total_elements', 0), translated_elements=stats.get('translated_elements', 0), total_characters=stats.get('total_characters', 0), processing_time_seconds=stats.get('processing_time_seconds', 0.0), provider=translation_result.get('provider', 'dify'), estimated_cost=estimated_cost ) db.add(translation_log) db.commit() logger.info(f"Logged translation stats for task {task_id}: {total_tokens} tokens, ${estimated_cost:.6f}") except Exception as log_error: logger.error(f"Failed to log translation stats: {log_error}") else: translation_service.set_job_state(task_id, TranslationJobState( task_id=task_id, target_lang=target_lang, source_lang=source_lang, status=TranslationStatusEnum.FAILED, progress=TranslationProgress(), error_message=error_message, started_at=datetime.utcnow() )) logger.error(f"Translation failed for task {task_id}: {error_message}") except Exception as e: logger.exception(f"Translation failed for task {task_id}") translation_service.set_job_state(task_id, TranslationJobState( task_id=task_id, target_lang=target_lang, source_lang=source_lang, status=TranslationStatusEnum.FAILED, progress=TranslationProgress(), error_message=str(e), started_at=datetime.utcnow() )) finally: db.close() @router.post("/{task_id}", response_model=TranslationStartResponse, status_code=status.HTTP_202_ACCEPTED) async def start_translation( task_id: str, request: TranslationRequest, background_tasks: BackgroundTasks, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """ Start a document translation job. - **task_id**: Task UUID of a completed OCR task - **target_lang**: Target language code (e.g., 'en', 'ja', 'zh-TW') - **source_lang**: Source language code ('auto' for automatic detection) Returns 202 Accepted with job information. Use /status endpoint to track progress. """ from app.services.translation_service import get_translation_service from app.schemas.translation import TranslationJobState # Get task task = task_service.get_task_by_id( db=db, task_id=task_id, user_id=current_user.id ) if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found" ) # Check task is completed if task.status != TaskStatus.COMPLETED: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Cannot translate task in '{task.status.value}' status. Task must be completed." ) # Check result JSON exists if not task.result_json_path or not Path(task.result_json_path).exists(): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="OCR result not found. Please process the document first." ) # Validate target language target_lang = request.target_lang if target_lang not in LANGUAGE_NAMES: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Unsupported target language: {target_lang}. Supported: {', '.join(LANGUAGE_NAMES.keys())}" ) # Check if translation already exists result_dir = Path(task.result_json_path).parent existing_translation = result_dir / f"{Path(task.result_json_path).stem.replace('_result', '')}_translated_{target_lang}.json" if existing_translation.exists(): logger.info(f"Translation already exists: {existing_translation}") # Return as completed return TranslationStartResponse( task_id=task_id, status=TranslationStatusEnum.COMPLETED, target_lang=target_lang, message="Translation already exists" ) # Check if translation is already in progress translation_service = get_translation_service() current_job = translation_service.get_job_state(task_id) if current_job and current_job.status in [TranslationStatusEnum.PENDING, TranslationStatusEnum.TRANSLATING]: return TranslationStartResponse( task_id=task_id, status=current_job.status, target_lang=current_job.target_lang, message="Translation already in progress" ) # Initialize job state translation_service.set_job_state(task_id, TranslationJobState( task_id=task_id, target_lang=target_lang, source_lang=request.source_lang, status=TranslationStatusEnum.PENDING, progress=TranslationProgress(), started_at=datetime.utcnow() )) # Start background translation task background_tasks.add_task( run_translation_task, task_id=task_id, task_db_id=task.id, target_lang=target_lang, source_lang=request.source_lang, user_id=current_user.id ) logger.info(f"Started translation job for task {task_id}, target_lang={target_lang}") return TranslationStartResponse( task_id=task_id, status=TranslationStatusEnum.PENDING, target_lang=target_lang, message="Translation job started" ) @router.get("/{task_id}/status", response_model=TranslationStatusResponse) async def get_translation_status( task_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """ Get the status of a translation job. - **task_id**: Task UUID Returns current translation status with progress information. """ from app.services.translation_service import get_translation_service # Verify task ownership task = task_service.get_task_by_id( db=db, task_id=task_id, user_id=current_user.id ) if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found" ) # Get job state translation_service = get_translation_service() job_state = translation_service.get_job_state(task_id) if not job_state: # No active job - check if any completed translations exist if task.result_json_path: result_dir = Path(task.result_json_path).parent translated_files = list(result_dir.glob("*_translated_*.json")) if translated_files: # Return completed status for the most recent translation latest_file = max(translated_files, key=lambda f: f.stat().st_mtime) # Extract language from filename lang = latest_file.stem.split("_translated_")[-1] return TranslationStatusResponse( task_id=task_id, status=TranslationStatusEnum.COMPLETED, target_lang=lang, progress=TranslationProgress(percentage=100.0) ) raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="No translation job found for this task" ) return TranslationStatusResponse( task_id=task_id, status=job_state.status, target_lang=job_state.target_lang, progress=job_state.progress, error_message=job_state.error_message, started_at=job_state.started_at, completed_at=job_state.completed_at ) @router.get("/{task_id}/result") async def get_translation_result( task_id: str, lang: str = Query(..., description="Target language code"), db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """ Get the translation result for a specific language. - **task_id**: Task UUID - **lang**: Target language code (e.g., 'en', 'ja') Returns the translation JSON file. """ # Verify task ownership task = task_service.get_task_by_id( db=db, task_id=task_id, user_id=current_user.id ) if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found" ) if not task.result_json_path: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="OCR result not found" ) # Find translation file result_dir = Path(task.result_json_path).parent base_name = Path(task.result_json_path).stem.replace('_result', '') translation_file = result_dir / f"{base_name}_translated_{lang}.json" if not translation_file.exists(): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Translation for language '{lang}' not found" ) # Return as JSON response with proper content type return FileResponse( path=str(translation_file), filename=translation_file.name, media_type="application/json" ) @router.get("/{task_id}/translations", response_model=TranslationListResponse) async def list_translations( task_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """ List all available translations for a task. - **task_id**: Task UUID Returns list of available translations with metadata. """ # Verify task ownership task = task_service.get_task_by_id( db=db, task_id=task_id, user_id=current_user.id ) if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found" ) translations = [] if task.result_json_path: result_dir = Path(task.result_json_path).parent translated_files = list(result_dir.glob("*_translated_*.json")) for translation_file in translated_files: try: # Extract language from filename lang = translation_file.stem.split("_translated_")[-1] # Read translation metadata with open(translation_file, 'r', encoding='utf-8') as f: data = json.load(f) stats_data = data.get('statistics', {}) translations.append(TranslationListItem( target_lang=lang, translated_at=datetime.fromisoformat(data.get('translated_at', '').replace('Z', '+00:00')), provider=data.get('provider', 'dify'), statistics=TranslationStatistics( total_elements=stats_data.get('total_elements', 0), translated_elements=stats_data.get('translated_elements', 0), skipped_elements=stats_data.get('skipped_elements', 0), total_characters=stats_data.get('total_characters', 0), processing_time_seconds=stats_data.get('processing_time_seconds', 0.0), total_tokens=stats_data.get('total_tokens', 0) ), file_path=str(translation_file) )) except Exception as e: logger.warning(f"Failed to read translation file {translation_file}: {e}") continue return TranslationListResponse( task_id=task_id, translations=translations ) @router.delete("/{task_id}/translations/{lang}", status_code=status.HTTP_204_NO_CONTENT) async def delete_translation( task_id: str, lang: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """ Delete a specific translation. - **task_id**: Task UUID - **lang**: Target language code to delete """ # Verify task ownership task = task_service.get_task_by_id( db=db, task_id=task_id, user_id=current_user.id ) if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found" ) if not task.result_json_path: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="OCR result not found" ) # Find translation file result_dir = Path(task.result_json_path).parent base_name = Path(task.result_json_path).stem.replace('_result', '') translation_file = result_dir / f"{base_name}_translated_{lang}.json" if not translation_file.exists(): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Translation for language '{lang}' not found" ) # Delete file translation_file.unlink() logger.info(f"Deleted translation {lang} for task {task_id}") return None @router.post("/{task_id}/pdf") async def download_translated_pdf( task_id: str, lang: str = Query(..., description="Target language code"), format: str = Query("reflow", description="PDF format: 'layout' or 'reflow'"), db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """ Download a translated PDF. - **task_id**: Task UUID - **lang**: Target language code (e.g., 'en', 'ja') - **format**: PDF format - 'layout' (preserves positions with text wrapping) or 'reflow' (flowing layout) Returns PDF file with translated content. """ from app.services.pdf_generator_service import pdf_generator_service from app.services.translation_service import list_available_translations import tempfile # Verify task ownership task = task_service.get_task_by_id( db=db, task_id=task_id, user_id=current_user.id ) if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found" ) if not task.result_json_path: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="OCR result not found" ) result_json_path = Path(task.result_json_path) if not result_json_path.exists(): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Result file not found" ) # Find translation file result_dir = result_json_path.parent base_name = result_json_path.stem.replace('_result', '').replace('edit_', '') translation_file = result_dir / f"{base_name}_translated_{lang}.json" # Also try with edit_ prefix removed differently if not translation_file.exists(): translation_file = result_dir / f"edit_translated_{lang}.json" if not translation_file.exists(): # List available translations for error message available = list_available_translations(result_dir) if available: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Translation for language '{lang}' not found. Available translations: {', '.join(available)}" ) else: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"No translations found for this task. Please translate the document first." ) # Check translation status in translation JSON try: with open(translation_file, 'r', encoding='utf-8') as f: translation_data = json.load(f) # Check for translations (Direct Track) or raw_ocr_translations (OCR Track) has_translations = translation_data.get('translations') has_raw_ocr_translations = translation_data.get('raw_ocr_translations') if not has_translations and not has_raw_ocr_translations: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Translation file is empty or incomplete" ) except json.JSONDecodeError: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid translation file format" ) # Validate format parameter use_layout = format.lower() == 'layout' # Generate translated PDF to temp file format_suffix = '_layout' if use_layout else '_reflow' output_filename = f"{task_id}_translated_{lang}{format_suffix}.pdf" with tempfile.NamedTemporaryFile(suffix='.pdf', delete=False) as tmp_file: output_path = Path(tmp_file.name) try: # Use result_dir as image source (contains extracted images) image_dir = result_json_path.parent # Choose PDF generation method based on format if use_layout: # Layout mode: preserve original positions with text wrapping success = pdf_generator_service.generate_translated_layout_pdf( result_json_path=result_json_path, translation_json_path=translation_file, output_path=output_path, source_file_path=image_dir ) else: # Reflow mode: flowing layout success = pdf_generator_service.generate_translated_pdf( result_json_path=result_json_path, translation_json_path=translation_file, output_path=output_path, source_file_path=image_dir ) if not success: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to generate translated PDF" ) logger.info(f"Generated translated PDF for task {task_id}, lang={lang}") return FileResponse( path=str(output_path), filename=output_filename, media_type="application/pdf", headers={ "Content-Disposition": f'attachment; filename="{output_filename}"' } ) except HTTPException: # Clean up temp file on HTTP errors if output_path.exists(): output_path.unlink() raise except Exception as e: # Clean up temp file on unexpected errors if output_path.exists(): output_path.unlink() logger.exception(f"Failed to generate translated PDF for task {task_id}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to generate translated PDF: {str(e)}" )