Files
OCR/backend/app/routers/translate.py
egg 0dcea4a7e7 fix: use task.files relationship to get source file path
Task model doesn't have file_path attribute directly. Use the files
relationship to access TaskFile.stored_path for source file path.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-02 18:12:22 +08:00

642 lines
21 KiB
Python

"""
Tool_OCR - Translation Router
Handles document translation operations via DIFY AI API
"""
import logging
import json
from datetime import datetime
from pathlib import Path
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, status, Query, BackgroundTasks
from fastapi.responses import FileResponse, JSONResponse
from sqlalchemy.orm import Session
from app.core.deps import get_db, get_current_user
from app.core.config import settings
from app.models.user import User
from app.models.task import Task, TaskStatus
from app.schemas.translation import (
TranslationRequest,
TranslationStartResponse,
TranslationStatusResponse,
TranslationStatusEnum,
TranslationProgress,
TranslationListResponse,
TranslationListItem,
TranslationStatistics,
)
from app.services.task_service import task_service
from app.services.dify_client import LANGUAGE_NAMES
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v2/translate", tags=["Translation"])
def run_translation_task(
task_id: str,
task_db_id: int,
target_lang: str,
source_lang: str = "auto"
):
"""
Background task to run document translation.
Args:
task_id: Task UUID string
task_db_id: Task database ID (for verification)
target_lang: Target language code
source_lang: Source language code ('auto' for detection)
"""
from app.core.database import SessionLocal
from app.services.translation_service import get_translation_service
from app.schemas.translation import TranslationJobState, TranslationProgress
db = SessionLocal()
translation_service = get_translation_service()
try:
logger.info(f"Starting translation for task {task_id} -> {target_lang}")
# Get task to find result JSON path
task = db.query(Task).filter(Task.task_id == task_id).first()
if not task:
logger.error(f"Task {task_id} not found")
return
if not task.result_json_path:
logger.error(f"Task {task_id} has no result JSON")
translation_service.set_job_state(task_id, TranslationJobState(
task_id=task_id,
target_lang=target_lang,
source_lang=source_lang,
status=TranslationStatusEnum.FAILED,
progress=TranslationProgress(),
error_message="No OCR result found",
started_at=datetime.utcnow()
))
return
result_json_path = Path(task.result_json_path)
if not result_json_path.exists():
logger.error(f"Result JSON not found: {result_json_path}")
translation_service.set_job_state(task_id, TranslationJobState(
task_id=task_id,
target_lang=target_lang,
source_lang=source_lang,
status=TranslationStatusEnum.FAILED,
progress=TranslationProgress(),
error_message="Result file not found",
started_at=datetime.utcnow()
))
return
# Update state to translating
translation_service.set_job_state(task_id, TranslationJobState(
task_id=task_id,
target_lang=target_lang,
source_lang=source_lang,
status=TranslationStatusEnum.TRANSLATING,
progress=TranslationProgress(),
started_at=datetime.utcnow()
))
# Progress callback
def progress_callback(progress: TranslationProgress):
current_state = translation_service.get_job_state(task_id)
if current_state:
current_state.status = TranslationStatusEnum.TRANSLATING
current_state.progress = progress
translation_service.set_job_state(task_id, current_state)
# Run translation
success, output_path, error_message = translation_service.translate_document(
task_id=task_id,
result_json_path=result_json_path,
target_lang=target_lang,
source_lang=source_lang,
progress_callback=progress_callback
)
if success:
translation_service.set_job_state(task_id, TranslationJobState(
task_id=task_id,
target_lang=target_lang,
source_lang=source_lang,
status=TranslationStatusEnum.COMPLETED,
progress=TranslationProgress(percentage=100.0),
started_at=datetime.utcnow(),
completed_at=datetime.utcnow(),
result_file_path=str(output_path) if output_path else None
))
logger.info(f"Translation completed for task {task_id}")
else:
translation_service.set_job_state(task_id, TranslationJobState(
task_id=task_id,
target_lang=target_lang,
source_lang=source_lang,
status=TranslationStatusEnum.FAILED,
progress=TranslationProgress(),
error_message=error_message,
started_at=datetime.utcnow()
))
logger.error(f"Translation failed for task {task_id}: {error_message}")
except Exception as e:
logger.exception(f"Translation failed for task {task_id}")
translation_service.set_job_state(task_id, TranslationJobState(
task_id=task_id,
target_lang=target_lang,
source_lang=source_lang,
status=TranslationStatusEnum.FAILED,
progress=TranslationProgress(),
error_message=str(e),
started_at=datetime.utcnow()
))
finally:
db.close()
@router.post("/{task_id}", response_model=TranslationStartResponse, status_code=status.HTTP_202_ACCEPTED)
async def start_translation(
task_id: str,
request: TranslationRequest,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Start a document translation job.
- **task_id**: Task UUID of a completed OCR task
- **target_lang**: Target language code (e.g., 'en', 'ja', 'zh-TW')
- **source_lang**: Source language code ('auto' for automatic detection)
Returns 202 Accepted with job information. Use /status endpoint to track progress.
"""
from app.services.translation_service import get_translation_service
from app.schemas.translation import TranslationJobState
# Get task
task = task_service.get_task_by_id(
db=db,
task_id=task_id,
user_id=current_user.id
)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found"
)
# Check task is completed
if task.status != TaskStatus.COMPLETED:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Cannot translate task in '{task.status.value}' status. Task must be completed."
)
# Check result JSON exists
if not task.result_json_path or not Path(task.result_json_path).exists():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="OCR result not found. Please process the document first."
)
# Validate target language
target_lang = request.target_lang
if target_lang not in LANGUAGE_NAMES:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Unsupported target language: {target_lang}. Supported: {', '.join(LANGUAGE_NAMES.keys())}"
)
# Check if translation already exists
result_dir = Path(task.result_json_path).parent
existing_translation = result_dir / f"{Path(task.result_json_path).stem.replace('_result', '')}_translated_{target_lang}.json"
if existing_translation.exists():
logger.info(f"Translation already exists: {existing_translation}")
# Return as completed
return TranslationStartResponse(
task_id=task_id,
status=TranslationStatusEnum.COMPLETED,
target_lang=target_lang,
message="Translation already exists"
)
# Check if translation is already in progress
translation_service = get_translation_service()
current_job = translation_service.get_job_state(task_id)
if current_job and current_job.status in [TranslationStatusEnum.PENDING, TranslationStatusEnum.TRANSLATING]:
return TranslationStartResponse(
task_id=task_id,
status=current_job.status,
target_lang=current_job.target_lang,
message="Translation already in progress"
)
# Initialize job state
translation_service.set_job_state(task_id, TranslationJobState(
task_id=task_id,
target_lang=target_lang,
source_lang=request.source_lang,
status=TranslationStatusEnum.PENDING,
progress=TranslationProgress(),
started_at=datetime.utcnow()
))
# Start background translation task
background_tasks.add_task(
run_translation_task,
task_id=task_id,
task_db_id=task.id,
target_lang=target_lang,
source_lang=request.source_lang
)
logger.info(f"Started translation job for task {task_id}, target_lang={target_lang}")
return TranslationStartResponse(
task_id=task_id,
status=TranslationStatusEnum.PENDING,
target_lang=target_lang,
message="Translation job started"
)
@router.get("/{task_id}/status", response_model=TranslationStatusResponse)
async def get_translation_status(
task_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Get the status of a translation job.
- **task_id**: Task UUID
Returns current translation status with progress information.
"""
from app.services.translation_service import get_translation_service
# Verify task ownership
task = task_service.get_task_by_id(
db=db,
task_id=task_id,
user_id=current_user.id
)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found"
)
# Get job state
translation_service = get_translation_service()
job_state = translation_service.get_job_state(task_id)
if not job_state:
# No active job - check if any completed translations exist
if task.result_json_path:
result_dir = Path(task.result_json_path).parent
translated_files = list(result_dir.glob("*_translated_*.json"))
if translated_files:
# Return completed status for the most recent translation
latest_file = max(translated_files, key=lambda f: f.stat().st_mtime)
# Extract language from filename
lang = latest_file.stem.split("_translated_")[-1]
return TranslationStatusResponse(
task_id=task_id,
status=TranslationStatusEnum.COMPLETED,
target_lang=lang,
progress=TranslationProgress(percentage=100.0)
)
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="No translation job found for this task"
)
return TranslationStatusResponse(
task_id=task_id,
status=job_state.status,
target_lang=job_state.target_lang,
progress=job_state.progress,
error_message=job_state.error_message,
started_at=job_state.started_at,
completed_at=job_state.completed_at
)
@router.get("/{task_id}/result")
async def get_translation_result(
task_id: str,
lang: str = Query(..., description="Target language code"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Get the translation result for a specific language.
- **task_id**: Task UUID
- **lang**: Target language code (e.g., 'en', 'ja')
Returns the translation JSON file.
"""
# Verify task ownership
task = task_service.get_task_by_id(
db=db,
task_id=task_id,
user_id=current_user.id
)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found"
)
if not task.result_json_path:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="OCR result not found"
)
# Find translation file
result_dir = Path(task.result_json_path).parent
base_name = Path(task.result_json_path).stem.replace('_result', '')
translation_file = result_dir / f"{base_name}_translated_{lang}.json"
if not translation_file.exists():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Translation for language '{lang}' not found"
)
# Return as JSON response with proper content type
return FileResponse(
path=str(translation_file),
filename=translation_file.name,
media_type="application/json"
)
@router.get("/{task_id}/translations", response_model=TranslationListResponse)
async def list_translations(
task_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
List all available translations for a task.
- **task_id**: Task UUID
Returns list of available translations with metadata.
"""
# Verify task ownership
task = task_service.get_task_by_id(
db=db,
task_id=task_id,
user_id=current_user.id
)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found"
)
translations = []
if task.result_json_path:
result_dir = Path(task.result_json_path).parent
translated_files = list(result_dir.glob("*_translated_*.json"))
for translation_file in translated_files:
try:
# Extract language from filename
lang = translation_file.stem.split("_translated_")[-1]
# Read translation metadata
with open(translation_file, 'r', encoding='utf-8') as f:
data = json.load(f)
stats_data = data.get('statistics', {})
translations.append(TranslationListItem(
target_lang=lang,
translated_at=datetime.fromisoformat(data.get('translated_at', '').replace('Z', '+00:00')),
provider=data.get('provider', 'dify'),
statistics=TranslationStatistics(
total_elements=stats_data.get('total_elements', 0),
translated_elements=stats_data.get('translated_elements', 0),
skipped_elements=stats_data.get('skipped_elements', 0),
total_characters=stats_data.get('total_characters', 0),
processing_time_seconds=stats_data.get('processing_time_seconds', 0.0),
total_tokens=stats_data.get('total_tokens', 0)
),
file_path=str(translation_file)
))
except Exception as e:
logger.warning(f"Failed to read translation file {translation_file}: {e}")
continue
return TranslationListResponse(
task_id=task_id,
translations=translations
)
@router.delete("/{task_id}/translations/{lang}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_translation(
task_id: str,
lang: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Delete a specific translation.
- **task_id**: Task UUID
- **lang**: Target language code to delete
"""
# Verify task ownership
task = task_service.get_task_by_id(
db=db,
task_id=task_id,
user_id=current_user.id
)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found"
)
if not task.result_json_path:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="OCR result not found"
)
# Find translation file
result_dir = Path(task.result_json_path).parent
base_name = Path(task.result_json_path).stem.replace('_result', '')
translation_file = result_dir / f"{base_name}_translated_{lang}.json"
if not translation_file.exists():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Translation for language '{lang}' not found"
)
# Delete file
translation_file.unlink()
logger.info(f"Deleted translation {lang} for task {task_id}")
return None
@router.post("/{task_id}/pdf")
async def download_translated_pdf(
task_id: str,
lang: str = Query(..., description="Target language code"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Download a translated PDF with layout preservation.
- **task_id**: Task UUID
- **lang**: Target language code (e.g., 'en', 'ja')
Returns PDF file with translated content preserving original layout.
"""
from app.services.pdf_generator_service import pdf_generator_service
from app.services.translation_service import list_available_translations
import tempfile
# Verify task ownership
task = task_service.get_task_by_id(
db=db,
task_id=task_id,
user_id=current_user.id
)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found"
)
if not task.result_json_path:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="OCR result not found"
)
result_json_path = Path(task.result_json_path)
if not result_json_path.exists():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Result file not found"
)
# Find translation file
result_dir = result_json_path.parent
base_name = result_json_path.stem.replace('_result', '').replace('edit_', '')
translation_file = result_dir / f"{base_name}_translated_{lang}.json"
# Also try with edit_ prefix removed differently
if not translation_file.exists():
translation_file = result_dir / f"edit_translated_{lang}.json"
if not translation_file.exists():
# List available translations for error message
available = list_available_translations(result_dir)
if available:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Translation for language '{lang}' not found. Available translations: {', '.join(available)}"
)
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"No translations found for this task. Please translate the document first."
)
# Check translation status in translation JSON
try:
with open(translation_file, 'r', encoding='utf-8') as f:
translation_data = json.load(f)
if not translation_data.get('translations'):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Translation file is empty or incomplete"
)
except json.JSONDecodeError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid translation file format"
)
# Generate translated PDF to temp file
output_filename = f"{task_id}_translated_{lang}.pdf"
with tempfile.NamedTemporaryFile(suffix='.pdf', delete=False) as tmp_file:
output_path = Path(tmp_file.name)
try:
# Get source file path for images if available
source_file_path = None
if task.files and len(task.files) > 0:
stored_path = task.files[0].stored_path
if stored_path and Path(stored_path).exists():
source_file_path = Path(stored_path)
success = pdf_generator_service.generate_translated_pdf(
result_json_path=result_json_path,
translation_json_path=translation_file,
output_path=output_path,
source_file_path=source_file_path
)
if not success:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to generate translated PDF"
)
logger.info(f"Generated translated PDF for task {task_id}, lang={lang}")
return FileResponse(
path=str(output_path),
filename=output_filename,
media_type="application/pdf",
headers={
"Content-Disposition": f'attachment; filename="{output_filename}"'
}
)
except HTTPException:
# Clean up temp file on HTTP errors
if output_path.exists():
output_path.unlink()
raise
except Exception as e:
# Clean up temp file on unexpected errors
if output_path.exists():
output_path.unlink()
logger.exception(f"Failed to generate translated PDF for task {task_id}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to generate translated PDF: {str(e)}"
)