""" Tool_OCR - Task Management Router Handles OCR task operations with user isolation """ import logging from typing import Optional from pathlib import Path import shutil import hashlib from fastapi import APIRouter, Depends, HTTPException, status, Query, UploadFile, File from fastapi.responses import FileResponse from sqlalchemy.orm import Session from app.core.deps import get_db, get_current_user from app.core.config import settings from app.models.user import User from app.models.task import TaskStatus, TaskFile from app.schemas.task import ( TaskCreate, TaskUpdate, TaskResponse, TaskDetailResponse, TaskListResponse, TaskStatsResponse, TaskStatusEnum, UploadResponse, ) from app.services.task_service import task_service from app.services.file_access_service import file_access_service logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/v2/tasks", tags=["Tasks"]) @router.post("/", response_model=TaskResponse, status_code=status.HTTP_201_CREATED) async def create_task( task_data: TaskCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """ Create a new OCR task - **filename**: Original filename (optional) - **file_type**: File MIME type (optional) """ try: task = task_service.create_task( db=db, user_id=current_user.id, filename=task_data.filename, file_type=task_data.file_type ) logger.info(f"Created task {task.task_id} for user {current_user.email}") return task except Exception as e: logger.exception(f"Failed to create task for user {current_user.id}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to create task: {str(e)}" ) @router.get("/", response_model=TaskListResponse) async def list_tasks( status_filter: Optional[TaskStatusEnum] = Query(None, alias="status"), filename_search: Optional[str] = Query(None, alias="filename"), date_from: Optional[str] = Query(None, alias="date_from"), date_to: Optional[str] = Query(None, alias="date_to"), page: int = Query(1, ge=1), page_size: int = Query(50, ge=1, le=100), order_by: str = Query("created_at"), order_desc: bool = Query(True), db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """ List user's tasks with pagination and filtering - **status**: Filter by task status (optional) - **filename**: Search by filename (partial match, optional) - **date_from**: Filter tasks from this date (YYYY-MM-DD, optional) - **date_to**: Filter tasks until this date (YYYY-MM-DD, optional) - **page**: Page number (starts from 1) - **page_size**: Number of tasks per page (max 100) - **order_by**: Sort field (created_at, updated_at, completed_at) - **order_desc**: Sort descending (default: true) """ try: # Convert enum to model enum if provided status_enum = TaskStatus[status_filter.value.upper()] if status_filter else None # Parse date strings from datetime import datetime date_from_dt = datetime.fromisoformat(date_from) if date_from else None date_to_dt = datetime.fromisoformat(date_to) if date_to else None # Calculate offset skip = (page - 1) * page_size # Get tasks tasks, total = task_service.get_user_tasks( db=db, user_id=current_user.id, status=status_enum, filename_search=filename_search, date_from=date_from_dt, date_to=date_to_dt, skip=skip, limit=page_size, order_by=order_by, order_desc=order_desc ) # Calculate pagination has_more = (skip + len(tasks)) < total return { "tasks": tasks, "total": total, "page": page, "page_size": page_size, "has_more": has_more } except Exception as e: logger.exception(f"Failed to list tasks for user {current_user.id}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to list tasks: {str(e)}" ) @router.get("/stats", response_model=TaskStatsResponse) async def get_task_stats( db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """ Get task statistics for current user Returns counts by status """ try: stats = task_service.get_user_stats(db=db, user_id=current_user.id) return stats except Exception as e: logger.exception(f"Failed to get stats for user {current_user.id}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to get statistics: {str(e)}" ) @router.get("/{task_id}", response_model=TaskDetailResponse) async def get_task( task_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """ Get task details by ID - **task_id**: Task UUID """ task = task_service.get_task_by_id( db=db, task_id=task_id, user_id=current_user.id ) if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found" ) return task @router.patch("/{task_id}", response_model=TaskResponse) async def update_task( task_id: str, task_update: TaskUpdate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """ Update task status and results - **task_id**: Task UUID - **status**: New task status (optional) - **error_message**: Error message if failed (optional) - **processing_time_ms**: Processing time in milliseconds (optional) - **result_json_path**: Path to JSON result (optional) - **result_markdown_path**: Path to Markdown result (optional) - **result_pdf_path**: Path to searchable PDF (optional) """ try: # Update status if provided if task_update.status: status_enum = TaskStatus[task_update.status.value.upper()] task = task_service.update_task_status( db=db, task_id=task_id, user_id=current_user.id, status=status_enum, error_message=task_update.error_message, processing_time_ms=task_update.processing_time_ms ) if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found" ) # Update result paths if provided if any([ task_update.result_json_path, task_update.result_markdown_path, task_update.result_pdf_path ]): task = task_service.update_task_results( db=db, task_id=task_id, user_id=current_user.id, result_json_path=task_update.result_json_path, result_markdown_path=task_update.result_markdown_path, result_pdf_path=task_update.result_pdf_path ) if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found" ) return task except HTTPException: raise except Exception as e: logger.exception(f"Failed to update task {task_id}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to update task: {str(e)}" ) @router.delete("/{task_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_task( task_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """ Delete a task - **task_id**: Task UUID """ success = task_service.delete_task( db=db, task_id=task_id, user_id=current_user.id ) if not success: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found" ) logger.info(f"Deleted task {task_id} for user {current_user.email}") return None @router.get("/{task_id}/download/json", summary="Download JSON result") async def download_json( task_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """ Download task result as JSON file - **task_id**: Task UUID """ # Get task task = task_service.get_task_by_id( db=db, task_id=task_id, user_id=current_user.id ) if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found" ) # Validate file access is_valid, error_msg = file_access_service.validate_file_access( db=db, user_id=current_user.id, task_id=task_id, file_path=task.result_json_path ) if not is_valid: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=error_msg ) # Return file filename = f"{task.filename or task_id}_result.json" return FileResponse( path=task.result_json_path, filename=filename, media_type="application/json" ) @router.get("/{task_id}/download/markdown", summary="Download Markdown result") async def download_markdown( task_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """ Download task result as Markdown file - **task_id**: Task UUID """ # Get task task = task_service.get_task_by_id( db=db, task_id=task_id, user_id=current_user.id ) if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found" ) # Validate file access is_valid, error_msg = file_access_service.validate_file_access( db=db, user_id=current_user.id, task_id=task_id, file_path=task.result_markdown_path ) if not is_valid: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=error_msg ) # Return file filename = f"{task.filename or task_id}_result.md" return FileResponse( path=task.result_markdown_path, filename=filename, media_type="text/markdown" ) @router.get("/{task_id}/download/pdf", summary="Download PDF result") async def download_pdf( task_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """ Download task result as searchable PDF file - **task_id**: Task UUID """ # Get task task = task_service.get_task_by_id( db=db, task_id=task_id, user_id=current_user.id ) if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found" ) # Validate file access is_valid, error_msg = file_access_service.validate_file_access( db=db, user_id=current_user.id, task_id=task_id, file_path=task.result_pdf_path ) if not is_valid: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=error_msg ) # Return file filename = f"{task.filename or task_id}_result.pdf" return FileResponse( path=task.result_pdf_path, filename=filename, media_type="application/pdf" ) @router.post("/{task_id}/start", response_model=TaskResponse, summary="Start task processing") async def start_task( task_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """ Start processing a pending task - **task_id**: Task UUID """ try: task = task_service.update_task_status( db=db, task_id=task_id, user_id=current_user.id, status=TaskStatus.PROCESSING ) if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found" ) logger.info(f"Started task {task_id} for user {current_user.email}") return task except HTTPException: raise except Exception as e: logger.exception(f"Failed to start task {task_id}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to start task: {str(e)}" ) @router.post("/{task_id}/cancel", response_model=TaskResponse, summary="Cancel task") async def cancel_task( task_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """ Cancel a pending or processing task - **task_id**: Task UUID """ try: # Get current task task = task_service.get_task_by_id( db=db, task_id=task_id, user_id=current_user.id ) if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found" ) # Only allow canceling pending or processing tasks if task.status not in [TaskStatus.PENDING, TaskStatus.PROCESSING]: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Cannot cancel task in '{task.status.value}' status" ) # Update to failed status with cancellation message task = task_service.update_task_status( db=db, task_id=task_id, user_id=current_user.id, status=TaskStatus.FAILED, error_message="Task cancelled by user" ) logger.info(f"Cancelled task {task_id} for user {current_user.email}") return task except HTTPException: raise except Exception as e: logger.exception(f"Failed to cancel task {task_id}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to cancel task: {str(e)}" ) @router.post("/{task_id}/retry", response_model=TaskResponse, summary="Retry failed task") async def retry_task( task_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """ Retry a failed task - **task_id**: Task UUID """ try: # Get current task task = task_service.get_task_by_id( db=db, task_id=task_id, user_id=current_user.id ) if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found" ) # Only allow retrying failed tasks if task.status != TaskStatus.FAILED: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Cannot retry task in '{task.status.value}' status" ) # Reset task to pending status task = task_service.update_task_status( db=db, task_id=task_id, user_id=current_user.id, status=TaskStatus.PENDING, error_message=None ) logger.info(f"Retrying task {task_id} for user {current_user.email}") return task except HTTPException: raise except Exception as e: logger.exception(f"Failed to retry task {task_id}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to retry task: {str(e)}" )