Files
OCR/backend/app/routers/translate.py
egg efa7e4175c feat: optimize task file generation and add visualization download
Backend changes:
- Disable PP-Structure debug file generation by default
- Separate raw_ocr_regions.json generation from debug flag (critical file)
- Add visualization folder download endpoint as ZIP
- Add has_visualization field to TaskDetailResponse
- Stop generating Markdown files
- Save translated PDFs to task folder with caching

Frontend changes:
- Replace JSON/MD download buttons with PDF buttons in TaskHistoryPage
- Add visualization download button in TaskDetailPage
- Fix Processing page task switching issue (reset isNotFound)

Archives two OpenSpec proposals:
- optimize-task-files-and-visualization
- simplify-frontend-add-billing

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-12 19:11:50 +08:00

708 lines
25 KiB
Python

"""
Tool_OCR - Translation Router
Handles document translation operations via DIFY AI API
"""
import logging
import json
from datetime import datetime
from pathlib import Path
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, status, Query, BackgroundTasks
from fastapi.responses import FileResponse, JSONResponse
from sqlalchemy.orm import Session
from app.core.deps import get_db, get_current_user
from app.core.config import settings
from app.models.user import User
from app.models.task import Task, TaskStatus
from app.schemas.translation import (
TranslationRequest,
TranslationStartResponse,
TranslationStatusResponse,
TranslationStatusEnum,
TranslationProgress,
TranslationListResponse,
TranslationListItem,
TranslationStatistics,
)
from app.services.task_service import task_service
from app.services.dify_client import LANGUAGE_NAMES
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v2/translate", tags=["Translation"])
def run_translation_task(
task_id: str,
task_db_id: int,
target_lang: str,
source_lang: str = "auto",
user_id: int = None
):
"""
Background task to run document translation.
Args:
task_id: Task UUID string
task_db_id: Task database ID (for verification)
target_lang: Target language code
source_lang: Source language code ('auto' for detection)
user_id: User ID for logging translation statistics
"""
from app.core.database import SessionLocal
from app.services.translation_service import get_translation_service
from app.schemas.translation import TranslationJobState, TranslationProgress
from app.models.translation_log import TranslationLog
db = SessionLocal()
translation_service = get_translation_service()
try:
logger.info(f"Starting translation for task {task_id} -> {target_lang}")
# Get task to find result JSON path
task = db.query(Task).filter(Task.task_id == task_id).first()
if not task:
logger.error(f"Task {task_id} not found")
return
if not task.result_json_path:
logger.error(f"Task {task_id} has no result JSON")
translation_service.set_job_state(task_id, TranslationJobState(
task_id=task_id,
target_lang=target_lang,
source_lang=source_lang,
status=TranslationStatusEnum.FAILED,
progress=TranslationProgress(),
error_message="No OCR result found",
started_at=datetime.utcnow()
))
return
result_json_path = Path(task.result_json_path)
if not result_json_path.exists():
logger.error(f"Result JSON not found: {result_json_path}")
translation_service.set_job_state(task_id, TranslationJobState(
task_id=task_id,
target_lang=target_lang,
source_lang=source_lang,
status=TranslationStatusEnum.FAILED,
progress=TranslationProgress(),
error_message="Result file not found",
started_at=datetime.utcnow()
))
return
# Update state to translating
translation_service.set_job_state(task_id, TranslationJobState(
task_id=task_id,
target_lang=target_lang,
source_lang=source_lang,
status=TranslationStatusEnum.TRANSLATING,
progress=TranslationProgress(),
started_at=datetime.utcnow()
))
# Progress callback
def progress_callback(progress: TranslationProgress):
current_state = translation_service.get_job_state(task_id)
if current_state:
current_state.status = TranslationStatusEnum.TRANSLATING
current_state.progress = progress
translation_service.set_job_state(task_id, current_state)
# Run translation
success, output_path, error_message = translation_service.translate_document(
task_id=task_id,
result_json_path=result_json_path,
target_lang=target_lang,
source_lang=source_lang,
progress_callback=progress_callback
)
if success:
translation_service.set_job_state(task_id, TranslationJobState(
task_id=task_id,
target_lang=target_lang,
source_lang=source_lang,
status=TranslationStatusEnum.COMPLETED,
progress=TranslationProgress(percentage=100.0),
started_at=datetime.utcnow(),
completed_at=datetime.utcnow(),
result_file_path=str(output_path) if output_path else None
))
logger.info(f"Translation completed for task {task_id}")
# Log translation statistics to database
if user_id and output_path:
try:
with open(output_path, 'r', encoding='utf-8') as f:
translation_result = json.load(f)
stats = translation_result.get('statistics', {})
total_tokens = stats.get('total_tokens', 0)
# Use actual price from Dify API if available, otherwise calculate estimated cost
actual_price = stats.get('total_price', 0.0)
if actual_price > 0:
estimated_cost = actual_price
else:
# Fallback: Calculate estimated cost based on token pricing
estimated_cost = (total_tokens / 1_000_000) * settings.translation_cost_per_million_tokens
translation_log = TranslationLog(
user_id=user_id,
task_id=task_id,
target_lang=target_lang,
source_lang=source_lang,
total_tokens=total_tokens,
input_tokens=0, # Dify doesn't provide separate input/output tokens
output_tokens=0,
total_elements=stats.get('total_elements', 0),
translated_elements=stats.get('translated_elements', 0),
total_characters=stats.get('total_characters', 0),
processing_time_seconds=stats.get('processing_time_seconds', 0.0),
provider=translation_result.get('provider', 'dify'),
estimated_cost=estimated_cost
)
db.add(translation_log)
db.commit()
logger.info(f"Logged translation stats for task {task_id}: {total_tokens} tokens, ${estimated_cost:.6f}")
except Exception as log_error:
logger.error(f"Failed to log translation stats: {log_error}")
else:
translation_service.set_job_state(task_id, TranslationJobState(
task_id=task_id,
target_lang=target_lang,
source_lang=source_lang,
status=TranslationStatusEnum.FAILED,
progress=TranslationProgress(),
error_message=error_message,
started_at=datetime.utcnow()
))
logger.error(f"Translation failed for task {task_id}: {error_message}")
except Exception as e:
logger.exception(f"Translation failed for task {task_id}")
translation_service.set_job_state(task_id, TranslationJobState(
task_id=task_id,
target_lang=target_lang,
source_lang=source_lang,
status=TranslationStatusEnum.FAILED,
progress=TranslationProgress(),
error_message=str(e),
started_at=datetime.utcnow()
))
finally:
db.close()
@router.post("/{task_id}", response_model=TranslationStartResponse, status_code=status.HTTP_202_ACCEPTED)
async def start_translation(
task_id: str,
request: TranslationRequest,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Start a document translation job.
- **task_id**: Task UUID of a completed OCR task
- **target_lang**: Target language code (e.g., 'en', 'ja', 'zh-TW')
- **source_lang**: Source language code ('auto' for automatic detection)
Returns 202 Accepted with job information. Use /status endpoint to track progress.
"""
from app.services.translation_service import get_translation_service
from app.schemas.translation import TranslationJobState
# Get task
task = task_service.get_task_by_id(
db=db,
task_id=task_id,
user_id=current_user.id
)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found"
)
# Check task is completed
if task.status != TaskStatus.COMPLETED:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Cannot translate task in '{task.status.value}' status. Task must be completed."
)
# Check result JSON exists
if not task.result_json_path or not Path(task.result_json_path).exists():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="OCR result not found. Please process the document first."
)
# Validate target language
target_lang = request.target_lang
if target_lang not in LANGUAGE_NAMES:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Unsupported target language: {target_lang}. Supported: {', '.join(LANGUAGE_NAMES.keys())}"
)
# Check if translation already exists
result_dir = Path(task.result_json_path).parent
existing_translation = result_dir / f"{Path(task.result_json_path).stem.replace('_result', '')}_translated_{target_lang}.json"
if existing_translation.exists():
logger.info(f"Translation already exists: {existing_translation}")
# Return as completed
return TranslationStartResponse(
task_id=task_id,
status=TranslationStatusEnum.COMPLETED,
target_lang=target_lang,
message="Translation already exists"
)
# Check if translation is already in progress
translation_service = get_translation_service()
current_job = translation_service.get_job_state(task_id)
if current_job and current_job.status in [TranslationStatusEnum.PENDING, TranslationStatusEnum.TRANSLATING]:
return TranslationStartResponse(
task_id=task_id,
status=current_job.status,
target_lang=current_job.target_lang,
message="Translation already in progress"
)
# Initialize job state
translation_service.set_job_state(task_id, TranslationJobState(
task_id=task_id,
target_lang=target_lang,
source_lang=request.source_lang,
status=TranslationStatusEnum.PENDING,
progress=TranslationProgress(),
started_at=datetime.utcnow()
))
# Start background translation task
background_tasks.add_task(
run_translation_task,
task_id=task_id,
task_db_id=task.id,
target_lang=target_lang,
source_lang=request.source_lang,
user_id=current_user.id
)
logger.info(f"Started translation job for task {task_id}, target_lang={target_lang}")
return TranslationStartResponse(
task_id=task_id,
status=TranslationStatusEnum.PENDING,
target_lang=target_lang,
message="Translation job started"
)
@router.get("/{task_id}/status", response_model=TranslationStatusResponse)
async def get_translation_status(
task_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Get the status of a translation job.
- **task_id**: Task UUID
Returns current translation status with progress information.
"""
from app.services.translation_service import get_translation_service
# Verify task ownership
task = task_service.get_task_by_id(
db=db,
task_id=task_id,
user_id=current_user.id
)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found"
)
# Get job state
translation_service = get_translation_service()
job_state = translation_service.get_job_state(task_id)
if not job_state:
# No active job - check if any completed translations exist
if task.result_json_path:
result_dir = Path(task.result_json_path).parent
translated_files = list(result_dir.glob("*_translated_*.json"))
if translated_files:
# Return completed status for the most recent translation
latest_file = max(translated_files, key=lambda f: f.stat().st_mtime)
# Extract language from filename
lang = latest_file.stem.split("_translated_")[-1]
return TranslationStatusResponse(
task_id=task_id,
status=TranslationStatusEnum.COMPLETED,
target_lang=lang,
progress=TranslationProgress(percentage=100.0)
)
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="No translation job found for this task"
)
return TranslationStatusResponse(
task_id=task_id,
status=job_state.status,
target_lang=job_state.target_lang,
progress=job_state.progress,
error_message=job_state.error_message,
started_at=job_state.started_at,
completed_at=job_state.completed_at
)
@router.get("/{task_id}/result")
async def get_translation_result(
task_id: str,
lang: str = Query(..., description="Target language code"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Get the translation result for a specific language.
- **task_id**: Task UUID
- **lang**: Target language code (e.g., 'en', 'ja')
Returns the translation JSON file.
"""
# Verify task ownership
task = task_service.get_task_by_id(
db=db,
task_id=task_id,
user_id=current_user.id
)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found"
)
if not task.result_json_path:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="OCR result not found"
)
# Find translation file
result_dir = Path(task.result_json_path).parent
base_name = Path(task.result_json_path).stem.replace('_result', '')
translation_file = result_dir / f"{base_name}_translated_{lang}.json"
if not translation_file.exists():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Translation for language '{lang}' not found"
)
# Return as JSON response with proper content type
return FileResponse(
path=str(translation_file),
filename=translation_file.name,
media_type="application/json"
)
@router.get("/{task_id}/translations", response_model=TranslationListResponse)
async def list_translations(
task_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
List all available translations for a task.
- **task_id**: Task UUID
Returns list of available translations with metadata.
"""
# Verify task ownership
task = task_service.get_task_by_id(
db=db,
task_id=task_id,
user_id=current_user.id
)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found"
)
translations = []
if task.result_json_path:
result_dir = Path(task.result_json_path).parent
translated_files = list(result_dir.glob("*_translated_*.json"))
for translation_file in translated_files:
try:
# Extract language from filename
lang = translation_file.stem.split("_translated_")[-1]
# Read translation metadata
with open(translation_file, 'r', encoding='utf-8') as f:
data = json.load(f)
stats_data = data.get('statistics', {})
translations.append(TranslationListItem(
target_lang=lang,
translated_at=datetime.fromisoformat(data.get('translated_at', '').replace('Z', '+00:00')),
provider=data.get('provider', 'dify'),
statistics=TranslationStatistics(
total_elements=stats_data.get('total_elements', 0),
translated_elements=stats_data.get('translated_elements', 0),
skipped_elements=stats_data.get('skipped_elements', 0),
total_characters=stats_data.get('total_characters', 0),
processing_time_seconds=stats_data.get('processing_time_seconds', 0.0),
total_tokens=stats_data.get('total_tokens', 0)
),
file_path=str(translation_file)
))
except Exception as e:
logger.warning(f"Failed to read translation file {translation_file}: {e}")
continue
return TranslationListResponse(
task_id=task_id,
translations=translations
)
@router.delete("/{task_id}/translations/{lang}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_translation(
task_id: str,
lang: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Delete a specific translation.
- **task_id**: Task UUID
- **lang**: Target language code to delete
"""
# Verify task ownership
task = task_service.get_task_by_id(
db=db,
task_id=task_id,
user_id=current_user.id
)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found"
)
if not task.result_json_path:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="OCR result not found"
)
# Find translation file
result_dir = Path(task.result_json_path).parent
base_name = Path(task.result_json_path).stem.replace('_result', '')
translation_file = result_dir / f"{base_name}_translated_{lang}.json"
if not translation_file.exists():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Translation for language '{lang}' not found"
)
# Delete file
translation_file.unlink()
logger.info(f"Deleted translation {lang} for task {task_id}")
return None
@router.post("/{task_id}/pdf")
async def download_translated_pdf(
task_id: str,
lang: str = Query(..., description="Target language code"),
format: str = Query("reflow", description="PDF format: 'layout' or 'reflow'"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Download a translated PDF.
- **task_id**: Task UUID
- **lang**: Target language code (e.g., 'en', 'ja')
- **format**: PDF format - 'layout' (preserves positions with text wrapping) or 'reflow' (flowing layout)
Returns PDF file with translated content.
"""
from app.services.pdf_generator_service import pdf_generator_service
from app.services.translation_service import list_available_translations
import tempfile
# Verify task ownership
task = task_service.get_task_by_id(
db=db,
task_id=task_id,
user_id=current_user.id
)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found"
)
if not task.result_json_path:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="OCR result not found"
)
result_json_path = Path(task.result_json_path)
if not result_json_path.exists():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Result file not found"
)
# Find translation file
result_dir = result_json_path.parent
base_name = result_json_path.stem.replace('_result', '').replace('edit_', '')
translation_file = result_dir / f"{base_name}_translated_{lang}.json"
# Also try with edit_ prefix removed differently
if not translation_file.exists():
translation_file = result_dir / f"edit_translated_{lang}.json"
if not translation_file.exists():
# List available translations for error message
available = list_available_translations(result_dir)
if available:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Translation for language '{lang}' not found. Available translations: {', '.join(available)}"
)
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"No translations found for this task. Please translate the document first."
)
# Check translation status in translation JSON
try:
with open(translation_file, 'r', encoding='utf-8') as f:
translation_data = json.load(f)
# Check for translations (Direct Track) or raw_ocr_translations (OCR Track)
has_translations = translation_data.get('translations')
has_raw_ocr_translations = translation_data.get('raw_ocr_translations')
if not has_translations and not has_raw_ocr_translations:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Translation file is empty or incomplete"
)
except json.JSONDecodeError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid translation file format"
)
# Validate format parameter
use_layout = format.lower() == 'layout'
# Generate translated PDF to task result folder (not temp)
# Use base name from result JSON (e.g., "scan" or "edit")
result_dir = result_json_path.parent
base_name = result_json_path.stem.replace('_result', '')
format_suffix = '_layout' if use_layout else '_reflow'
output_filename = f"{base_name}_translated_{lang}{format_suffix}.pdf"
output_path = result_dir / output_filename
# Check if PDF already exists and is newer than translation JSON
if output_path.exists():
pdf_mtime = output_path.stat().st_mtime
translation_mtime = translation_file.stat().st_mtime
if pdf_mtime >= translation_mtime:
# PDF is up-to-date, serve directly
logger.info(f"Serving cached translated PDF: {output_path}")
return FileResponse(
path=str(output_path),
filename=output_filename,
media_type="application/pdf",
headers={
"Content-Disposition": f'attachment; filename="{output_filename}"'
}
)
try:
# Choose PDF generation method based on format
if use_layout:
# Layout mode: preserve original positions with text wrapping
success = pdf_generator_service.generate_translated_layout_pdf(
result_json_path=result_json_path,
translation_json_path=translation_file,
output_path=output_path,
source_file_path=result_dir
)
else:
# Reflow mode: flowing layout
success = pdf_generator_service.generate_translated_pdf(
result_json_path=result_json_path,
translation_json_path=translation_file,
output_path=output_path,
source_file_path=result_dir
)
if not success:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to generate translated PDF"
)
logger.info(f"Generated translated PDF: {output_path}")
return FileResponse(
path=str(output_path),
filename=output_filename,
media_type="application/pdf",
headers={
"Content-Disposition": f'attachment; filename="{output_filename}"'
}
)
except HTTPException:
raise
except Exception as e:
logger.exception(f"Failed to generate translated PDF for task {task_id}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to generate translated PDF: {str(e)}"
)