Files
OCR/backend/app/routers/tasks.py
egg fa1abcd8e6 feat: implement layout-preserving PDF generation with table reconstruction
Major Features:
- Add PDF generation service with Chinese font support
- Parse HTML tables from PP-StructureV3 and rebuild with ReportLab
- Extract table text for translation purposes
- Auto-filter text regions inside tables to avoid overlaps

Backend Changes:
1. pdf_generator_service.py (NEW)
   - HTMLTableParser: Parse HTML tables to extract structure
   - PDFGeneratorService: Generate layout-preserving PDFs
   - Coordinate transformation: OCR (top-left) → PDF (bottom-left)
   - Font size heuristics: 75% of bbox height with width checking
   - Table reconstruction: Parse HTML → ReportLab Table
   - Image embedding: Extract bbox from filenames

2. ocr_service.py
   - Add _extract_table_text() for translation support
   - Add output_dir parameter to save images to result directory
   - Extract bbox from image filenames (img_in_table_box_x1_y1_x2_y2.jpg)

3. tasks.py
   - Update process_task_ocr to use save_results() with PDF generation
   - Fix download_pdf endpoint to use database-stored PDF paths
   - Support on-demand PDF generation from JSON

4. config.py
   - Add chinese_font_path configuration
   - Add pdf_enable_bbox_debug flag

Frontend Changes:
1. PDFViewer.tsx (NEW)
   - React PDF viewer with zoom and pagination
   - Memoized file config to prevent unnecessary reloads

2. TaskDetailPage.tsx & ResultsPage.tsx
   - Integrate PDF preview and download

3. main.tsx
   - Configure PDF.js worker via CDN

4. vite.config.ts
   - Add host: '0.0.0.0' for network access
   - Use VITE_API_URL environment variable for backend proxy

Dependencies:
- reportlab: PDF generation library
- Noto Sans SC font: Chinese character support

🤖 Generated with Claude Code
https://claude.com/claude-code

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-17 20:21:56 +08:00

750 lines
22 KiB
Python

"""
Tool_OCR - Task Management Router
Handles OCR task operations with user isolation
"""
import logging
from typing import Optional
from pathlib import Path
import shutil
import hashlib
from fastapi import APIRouter, Depends, HTTPException, status, Query, UploadFile, File, BackgroundTasks
from fastapi.responses import FileResponse
from sqlalchemy.orm import Session
import json
from datetime import datetime
from app.core.deps import get_db, get_current_user
from app.core.config import settings
from app.models.user import User
from app.models.task import TaskStatus, TaskFile
from app.schemas.task import (
TaskCreate,
TaskUpdate,
TaskResponse,
TaskDetailResponse,
TaskListResponse,
TaskStatsResponse,
TaskStatusEnum,
UploadResponse,
)
from app.services.task_service import task_service
from app.services.file_access_service import file_access_service
from app.services.ocr_service import OCRService
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v2/tasks", tags=["Tasks"])
def process_task_ocr(task_id: str, task_db_id: int, file_path: str, filename: str):
"""
Background task to process OCR for a task
Args:
task_id: Task UUID string
task_db_id: Task database ID
file_path: Path to uploaded file
filename: Original filename
"""
from app.core.database import SessionLocal
from app.models.task import Task
db = SessionLocal()
start_time = datetime.now()
try:
logger.info(f"Starting OCR processing for task {task_id}, file: {filename}")
# Get task directly by database ID (bypass user isolation for background task)
task = db.query(Task).filter(Task.id == task_db_id).first()
if not task:
logger.error(f"Task {task_id} not found in database")
return
# Initialize OCR service
ocr_service = OCRService()
# Create result directory before OCR processing (needed for saving extracted images)
result_dir = Path(settings.result_dir) / task_id
result_dir.mkdir(parents=True, exist_ok=True)
# Process the file with OCR
ocr_result = ocr_service.process_image(
image_path=Path(file_path),
lang='ch',
detect_layout=True,
output_dir=result_dir
)
# Calculate processing time
processing_time_ms = int((datetime.now() - start_time).total_seconds() * 1000)
# Save results using OCR service (includes JSON, Markdown, and PDF generation)
json_path, markdown_path, pdf_path = ocr_service.save_results(
result=ocr_result,
output_dir=result_dir,
file_id=Path(filename).stem,
source_file_path=Path(file_path)
)
# Update task with results (direct database update)
task.result_json_path = str(json_path) if json_path else None
task.result_markdown_path = str(markdown_path) if markdown_path else None
task.result_pdf_path = str(pdf_path) if pdf_path else None
task.processing_time_ms = processing_time_ms
task.status = TaskStatus.COMPLETED
task.completed_at = datetime.utcnow()
task.updated_at = datetime.utcnow()
db.commit()
logger.info(f"OCR processing completed for task {task_id} in {processing_time_ms}ms")
except Exception as e:
logger.exception(f"OCR processing failed for task {task_id}")
# Update task status to failed (direct database update)
try:
task = db.query(Task).filter(Task.id == task_db_id).first()
if task:
task.status = TaskStatus.FAILED
task.error_message = str(e)
task.updated_at = datetime.utcnow()
db.commit()
except Exception as update_error:
logger.error(f"Failed to update task status: {update_error}")
finally:
db.close()
@router.post("/", response_model=TaskResponse, status_code=status.HTTP_201_CREATED)
async def create_task(
task_data: TaskCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Create a new OCR task
- **filename**: Original filename (optional)
- **file_type**: File MIME type (optional)
"""
try:
task = task_service.create_task(
db=db,
user_id=current_user.id,
filename=task_data.filename,
file_type=task_data.file_type
)
logger.info(f"Created task {task.task_id} for user {current_user.email}")
return task
except Exception as e:
logger.exception(f"Failed to create task for user {current_user.id}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to create task: {str(e)}"
)
@router.get("/", response_model=TaskListResponse)
async def list_tasks(
status_filter: Optional[TaskStatusEnum] = Query(None, alias="status"),
filename_search: Optional[str] = Query(None, alias="filename"),
date_from: Optional[str] = Query(None, alias="date_from"),
date_to: Optional[str] = Query(None, alias="date_to"),
page: int = Query(1, ge=1),
page_size: int = Query(50, ge=1, le=100),
order_by: str = Query("created_at"),
order_desc: bool = Query(True),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
List user's tasks with pagination and filtering
- **status**: Filter by task status (optional)
- **filename**: Search by filename (partial match, optional)
- **date_from**: Filter tasks from this date (YYYY-MM-DD, optional)
- **date_to**: Filter tasks until this date (YYYY-MM-DD, optional)
- **page**: Page number (starts from 1)
- **page_size**: Number of tasks per page (max 100)
- **order_by**: Sort field (created_at, updated_at, completed_at)
- **order_desc**: Sort descending (default: true)
"""
try:
# Convert enum to model enum if provided
status_enum = TaskStatus[status_filter.value.upper()] if status_filter else None
# Parse date strings
from datetime import datetime
date_from_dt = datetime.fromisoformat(date_from) if date_from else None
date_to_dt = datetime.fromisoformat(date_to) if date_to else None
# Calculate offset
skip = (page - 1) * page_size
# Get tasks
tasks, total = task_service.get_user_tasks(
db=db,
user_id=current_user.id,
status=status_enum,
filename_search=filename_search,
date_from=date_from_dt,
date_to=date_to_dt,
skip=skip,
limit=page_size,
order_by=order_by,
order_desc=order_desc
)
# Calculate pagination
has_more = (skip + len(tasks)) < total
return {
"tasks": tasks,
"total": total,
"page": page,
"page_size": page_size,
"has_more": has_more
}
except Exception as e:
logger.exception(f"Failed to list tasks for user {current_user.id}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to list tasks: {str(e)}"
)
@router.get("/stats", response_model=TaskStatsResponse)
async def get_task_stats(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Get task statistics for current user
Returns counts by status
"""
try:
stats = task_service.get_user_stats(db=db, user_id=current_user.id)
return stats
except Exception as e:
logger.exception(f"Failed to get stats for user {current_user.id}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get statistics: {str(e)}"
)
@router.get("/{task_id}", response_model=TaskDetailResponse)
async def get_task(
task_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Get task details by ID
- **task_id**: Task UUID
"""
task = task_service.get_task_by_id(
db=db,
task_id=task_id,
user_id=current_user.id
)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found"
)
return task
@router.patch("/{task_id}", response_model=TaskResponse)
async def update_task(
task_id: str,
task_update: TaskUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Update task status and results
- **task_id**: Task UUID
- **status**: New task status (optional)
- **error_message**: Error message if failed (optional)
- **processing_time_ms**: Processing time in milliseconds (optional)
- **result_json_path**: Path to JSON result (optional)
- **result_markdown_path**: Path to Markdown result (optional)
- **result_pdf_path**: Path to searchable PDF (optional)
"""
try:
# Update status if provided
if task_update.status:
status_enum = TaskStatus[task_update.status.value.upper()]
task = task_service.update_task_status(
db=db,
task_id=task_id,
user_id=current_user.id,
status=status_enum,
error_message=task_update.error_message,
processing_time_ms=task_update.processing_time_ms
)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found"
)
# Update result paths if provided
if any([
task_update.result_json_path,
task_update.result_markdown_path,
task_update.result_pdf_path
]):
task = task_service.update_task_results(
db=db,
task_id=task_id,
user_id=current_user.id,
result_json_path=task_update.result_json_path,
result_markdown_path=task_update.result_markdown_path,
result_pdf_path=task_update.result_pdf_path
)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found"
)
return task
except HTTPException:
raise
except Exception as e:
logger.exception(f"Failed to update task {task_id}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to update task: {str(e)}"
)
@router.delete("/{task_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_task(
task_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Delete a task
- **task_id**: Task UUID
"""
success = task_service.delete_task(
db=db,
task_id=task_id,
user_id=current_user.id
)
if not success:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found"
)
logger.info(f"Deleted task {task_id} for user {current_user.email}")
return None
@router.get("/{task_id}/download/json", summary="Download JSON result")
async def download_json(
task_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Download task result as JSON file
- **task_id**: Task UUID
"""
# Get task
task = task_service.get_task_by_id(
db=db,
task_id=task_id,
user_id=current_user.id
)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found"
)
# Validate file access
is_valid, error_msg = file_access_service.validate_file_access(
db=db,
user_id=current_user.id,
task_id=task_id,
file_path=task.result_json_path
)
if not is_valid:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=error_msg
)
# Return file
filename = f"{task.filename or task_id}_result.json"
return FileResponse(
path=task.result_json_path,
filename=filename,
media_type="application/json"
)
@router.get("/{task_id}/download/markdown", summary="Download Markdown result")
async def download_markdown(
task_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Download task result as Markdown file
- **task_id**: Task UUID
"""
# Get task
task = task_service.get_task_by_id(
db=db,
task_id=task_id,
user_id=current_user.id
)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found"
)
# Validate file access
is_valid, error_msg = file_access_service.validate_file_access(
db=db,
user_id=current_user.id,
task_id=task_id,
file_path=task.result_markdown_path
)
if not is_valid:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=error_msg
)
# Return file
filename = f"{task.filename or task_id}_result.md"
return FileResponse(
path=task.result_markdown_path,
filename=filename,
media_type="text/markdown"
)
@router.get("/{task_id}/download/pdf", summary="Download PDF result")
async def download_pdf(
task_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Download task result as layout-preserving PDF file
- **task_id**: Task UUID
Returns a PDF that preserves the original document layout using OCR results.
The PDF is generated from OCR JSON data and cached for subsequent requests.
"""
from pathlib import Path
from app.services.pdf_generator_service import pdf_generator_service
# Get task
task = task_service.get_task_by_id(
db=db,
task_id=task_id,
user_id=current_user.id
)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found"
)
# Check if task is completed
if task.status.value != "completed":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Task is not completed yet. Please wait for OCR processing to finish."
)
# Check if PDF path is stored in database
if task.result_pdf_path and Path(task.result_pdf_path).exists():
pdf_path = Path(task.result_pdf_path)
logger.info(f"Using pre-generated PDF from database: {pdf_path.name}")
else:
# Fallback: Try to generate PDF on-demand
result_dir = Path(settings.result_dir) / task_id
# Use stored JSON path or construct it
if task.result_json_path and Path(task.result_json_path).exists():
json_path = Path(task.result_json_path)
else:
# Try to find JSON file in result directory
json_files = list(result_dir.glob("*_result.json"))
if not json_files:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="OCR result JSON not found"
)
json_path = json_files[0]
# Construct PDF path based on JSON filename
pdf_filename = json_path.stem.replace("_result", "_layout") + ".pdf"
pdf_path = result_dir / pdf_filename
# Generate PDF if it doesn't exist
if not pdf_path.exists():
logger.info(f"Generating layout-preserving PDF for task {task_id}")
# Get source file path if available
source_file = None
task_file = db.query(TaskFile).filter(TaskFile.task_id == task.id).first()
if task_file and task_file.stored_path and Path(task_file.stored_path).exists():
source_file = Path(task_file.stored_path)
# Generate PDF
success = pdf_generator_service.generate_layout_pdf(
json_path=json_path,
output_path=pdf_path,
source_file_path=source_file
)
if not success:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to generate PDF. Please check server logs."
)
logger.info(f"PDF generated successfully: {pdf_path.name}")
# Validate file access
is_valid, error_msg = file_access_service.validate_file_access(
db=db,
user_id=current_user.id,
task_id=task_id,
file_path=str(pdf_path)
)
if not is_valid:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=error_msg
)
# Return file
filename = f"{task.filename or task_id}_result.pdf"
return FileResponse(
path=str(pdf_path),
filename=filename,
media_type="application/pdf"
)
@router.post("/{task_id}/start", response_model=TaskResponse, summary="Start task processing")
async def start_task(
task_id: str,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Start processing a pending task
- **task_id**: Task UUID
"""
try:
# Get task details
task = task_service.get_task_by_id(
db=db,
task_id=task_id,
user_id=current_user.id
)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found"
)
# Check if task is in pending status
if task.status != TaskStatus.PENDING:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Cannot start task in '{task.status.value}' status"
)
# Get task file
task_file = db.query(TaskFile).filter(TaskFile.task_id == task.id).first()
if not task_file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task file not found"
)
# Update task status to processing
task = task_service.update_task_status(
db=db,
task_id=task_id,
user_id=current_user.id,
status=TaskStatus.PROCESSING
)
# Start OCR processing in background
background_tasks.add_task(
process_task_ocr,
task_id=task_id,
task_db_id=task.id,
file_path=task_file.stored_path,
filename=task_file.original_name
)
logger.info(f"Started OCR processing task {task_id} for user {current_user.email}")
return task
except HTTPException:
raise
except Exception as e:
logger.exception(f"Failed to start task {task_id}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to start task: {str(e)}"
)
@router.post("/{task_id}/cancel", response_model=TaskResponse, summary="Cancel task")
async def cancel_task(
task_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Cancel a pending or processing task
- **task_id**: Task UUID
"""
try:
# Get current task
task = task_service.get_task_by_id(
db=db,
task_id=task_id,
user_id=current_user.id
)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found"
)
# Only allow canceling pending or processing tasks
if task.status not in [TaskStatus.PENDING, TaskStatus.PROCESSING]:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Cannot cancel task in '{task.status.value}' status"
)
# Update to failed status with cancellation message
task = task_service.update_task_status(
db=db,
task_id=task_id,
user_id=current_user.id,
status=TaskStatus.FAILED,
error_message="Task cancelled by user"
)
logger.info(f"Cancelled task {task_id} for user {current_user.email}")
return task
except HTTPException:
raise
except Exception as e:
logger.exception(f"Failed to cancel task {task_id}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to cancel task: {str(e)}"
)
@router.post("/{task_id}/retry", response_model=TaskResponse, summary="Retry failed task")
async def retry_task(
task_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Retry a failed task
- **task_id**: Task UUID
"""
try:
# Get current task
task = task_service.get_task_by_id(
db=db,
task_id=task_id,
user_id=current_user.id
)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found"
)
# Only allow retrying failed tasks
if task.status != TaskStatus.FAILED:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Cannot retry task in '{task.status.value}' status"
)
# Reset task to pending status
task = task_service.update_task_status(
db=db,
task_id=task_id,
user_id=current_user.id,
status=TaskStatus.PENDING,
error_message=None
)
logger.info(f"Retrying task {task_id} for user {current_user.email}")
return task
except HTTPException:
raise
except Exception as e:
logger.exception(f"Failed to retry task {task_id}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to retry task: {str(e)}"
)