""" Tool_OCR - Admin Router Administrative endpoints for system management """ import logging from typing import Optional from datetime import datetime from fastapi import APIRouter, Depends, HTTPException, status, Query from sqlalchemy.orm import Session from app.core.deps import get_db, get_current_admin_user from app.core.config import settings from app.models.user import User from app.models.task import TaskStatus from app.services.admin_service import admin_service from app.services.audit_service import audit_service from app.services.task_service import task_service from app.services.cleanup_service import cleanup_service from app.services.cleanup_scheduler import get_cleanup_scheduler logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/v2/admin", tags=["Admin"]) @router.get("/stats", summary="Get system statistics") async def get_system_stats( db: Session = Depends(get_db), admin_user: User = Depends(get_current_admin_user) ): """ Get overall system statistics Requires admin privileges """ try: stats = admin_service.get_system_statistics(db) return stats except Exception as e: logger.exception("Failed to get system stats") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to get system stats: {str(e)}" ) @router.get("/users", summary="List all users") async def list_users( page: int = Query(1, ge=1), page_size: int = Query(50, ge=1, le=100), db: Session = Depends(get_db), admin_user: User = Depends(get_current_admin_user) ): """ Get list of all users with statistics Requires admin privileges """ try: skip = (page - 1) * page_size users, total = admin_service.get_user_list(db, skip=skip, limit=page_size) return { "users": users, "total": total, "page": page, "page_size": page_size, "has_more": (skip + len(users)) < total } except Exception as e: logger.exception("Failed to list users") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to list users: {str(e)}" ) @router.get("/users/top", summary="Get top users") async def get_top_users( metric: str = Query("tasks", regex="^(tasks|completed_tasks)$"), limit: int = Query(10, ge=1, le=50), db: Session = Depends(get_db), admin_user: User = Depends(get_current_admin_user) ): """ Get top users by metric - **metric**: Ranking metric (tasks or completed_tasks) - **limit**: Number of users to return Requires admin privileges """ try: top_users = admin_service.get_top_users(db, metric=metric, limit=limit) return top_users except Exception as e: logger.exception("Failed to get top users") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to get top users: {str(e)}" ) @router.get("/audit-logs", summary="Get audit logs") async def get_audit_logs( user_id: Optional[int] = Query(None), event_category: Optional[str] = Query(None), event_type: Optional[str] = Query(None), date_from: Optional[str] = Query(None), date_to: Optional[str] = Query(None), success_only: Optional[bool] = Query(None), page: int = Query(1, ge=1), page_size: int = Query(100, ge=1, le=500), db: Session = Depends(get_db), admin_user: User = Depends(get_current_admin_user) ): """ Get audit logs with filtering - **user_id**: Filter by user ID (optional) - **event_category**: Filter by category (authentication, task, admin, system) - **event_type**: Filter by event type (optional) - **date_from**: Filter from date (YYYY-MM-DD, optional) - **date_to**: Filter to date (YYYY-MM-DD, optional) - **success_only**: Filter by success status (optional) Requires admin privileges """ try: # Parse dates date_from_dt = datetime.fromisoformat(date_from) if date_from else None date_to_dt = datetime.fromisoformat(date_to) if date_to else None skip = (page - 1) * page_size logs, total = audit_service.get_logs( db=db, user_id=user_id, event_category=event_category, event_type=event_type, date_from=date_from_dt, date_to=date_to_dt, success_only=success_only, skip=skip, limit=page_size ) return { "logs": [log.to_dict() for log in logs], "total": total, "page": page, "page_size": page_size, "has_more": (skip + len(logs)) < total } except Exception as e: logger.exception("Failed to get audit logs") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to get audit logs: {str(e)}" ) @router.get("/audit-logs/user/{user_id}/summary", summary="Get user activity summary") async def get_user_activity_summary( user_id: int, days: int = Query(30, ge=1, le=365), db: Session = Depends(get_db), admin_user: User = Depends(get_current_admin_user) ): """ Get user activity summary for the last N days - **user_id**: User ID - **days**: Number of days to look back (default: 30) Requires admin privileges """ try: summary = audit_service.get_user_activity_summary(db, user_id=user_id, days=days) return summary except Exception as e: logger.exception(f"Failed to get activity summary for user {user_id}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to get user activity summary: {str(e)}" ) @router.get("/translation-stats", summary="Get translation statistics") async def get_translation_stats( db: Session = Depends(get_db), admin_user: User = Depends(get_current_admin_user) ): """ Get translation usage statistics for billing and monitoring. Returns: - total_translations: Total number of translation jobs - total_tokens: Sum of all tokens used - total_characters: Sum of all characters translated - estimated_cost: Estimated cost based on token pricing - by_language: Breakdown by target language - recent_translations: List of recent translation activities - last_30_days: Statistics for the last 30 days Requires admin privileges. """ try: stats = admin_service.get_translation_statistics(db) return stats except Exception as e: logger.exception("Failed to get translation statistics") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to get translation statistics: {str(e)}" ) @router.get("/tasks", summary="List all tasks (admin)") async def list_all_tasks( user_id: Optional[int] = Query(None, description="Filter by user ID"), status_filter: Optional[str] = Query(None, description="Filter by status"), include_deleted: bool = Query(True, description="Include soft-deleted tasks"), include_files_deleted: bool = Query(True, description="Include tasks with deleted files"), page: int = Query(1, ge=1), page_size: int = Query(50, ge=1, le=100), db: Session = Depends(get_db), admin_user: User = Depends(get_current_admin_user) ): """ Get list of all tasks across all users. Includes soft-deleted tasks and tasks with deleted files by default. - **user_id**: Filter by user ID (optional) - **status_filter**: Filter by status (pending, processing, completed, failed) - **include_deleted**: Include soft-deleted tasks (default: true) - **include_files_deleted**: Include tasks with deleted files (default: true) Requires admin privileges. """ try: # Parse status filter task_status = None if status_filter: try: task_status = TaskStatus(status_filter) except ValueError: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid status: {status_filter}" ) skip = (page - 1) * page_size tasks, total = task_service.get_all_tasks_admin( db=db, user_id=user_id, status=task_status, include_deleted=include_deleted, include_files_deleted=include_files_deleted, skip=skip, limit=page_size ) return { "tasks": [task.to_dict() for task in tasks], "total": total, "page": page, "page_size": page_size, "has_more": (skip + len(tasks)) < total } except HTTPException: raise except Exception as e: logger.exception("Failed to list tasks") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to list tasks: {str(e)}" ) @router.get("/tasks/{task_id}", summary="Get task details (admin)") async def get_task_admin( task_id: str, db: Session = Depends(get_db), admin_user: User = Depends(get_current_admin_user) ): """ Get detailed information about a specific task (admin view). Can access any task regardless of ownership or deletion status. Requires admin privileges. """ try: task = task_service.get_task_by_id_admin(db, task_id) if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Task not found: {task_id}" ) return task.to_dict() except HTTPException: raise except Exception as e: logger.exception(f"Failed to get task {task_id}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to get task: {str(e)}" ) @router.get("/storage/stats", summary="Get storage statistics") async def get_storage_stats( db: Session = Depends(get_db), admin_user: User = Depends(get_current_admin_user) ): """ Get storage usage statistics. Returns: - total_tasks: Total number of tasks - tasks_with_files: Tasks that still have files on disk - tasks_files_deleted: Tasks where files have been cleaned up - soft_deleted_tasks: Tasks that have been soft-deleted - disk_usage: Actual disk usage in bytes and MB - per_user: Breakdown by user Requires admin privileges. """ try: stats = cleanup_service.get_storage_stats(db) return stats except Exception as e: logger.exception("Failed to get storage stats") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to get storage stats: {str(e)}" ) @router.get("/cleanup/status", summary="Get cleanup scheduler status") async def get_cleanup_status( admin_user: User = Depends(get_current_admin_user) ): """ Get the status of the automatic cleanup scheduler. Returns: - enabled: Whether cleanup is enabled in configuration - running: Whether scheduler is currently running - interval_hours: Hours between cleanup runs - max_files_per_user: Files to keep per user - last_run: Timestamp of last cleanup - next_run: Estimated next cleanup time - last_result: Result of last cleanup Requires admin privileges. """ try: scheduler = get_cleanup_scheduler() return scheduler.status except Exception as e: logger.exception("Failed to get cleanup status") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to get cleanup status: {str(e)}" ) @router.post("/cleanup/trigger", summary="Trigger file cleanup") async def trigger_cleanup( max_files_per_user: Optional[int] = Query(None, description="Override max files per user"), db: Session = Depends(get_db), admin_user: User = Depends(get_current_admin_user) ): """ Manually trigger file cleanup process. Deletes old files while preserving database records. - **max_files_per_user**: Override the default retention count (optional) Returns cleanup statistics including files deleted and space freed. Requires admin privileges. """ try: files_to_keep = max_files_per_user or settings.max_files_per_user result = cleanup_service.cleanup_all_users(db, max_files_per_user=files_to_keep) logger.info( f"Manual cleanup triggered by admin {admin_user.username}: " f"{result['total_files_deleted']} files, {result['total_bytes_freed']} bytes" ) # Log admin cleanup action audit_service.log_event( db=db, event_type="admin_cleanup", event_category="admin", description=f"Manual cleanup: {result['total_files_deleted']} files, {result['total_bytes_freed']} bytes freed", user_id=admin_user.id, success=True, metadata={ "files_deleted": result['total_files_deleted'], "bytes_freed": result['total_bytes_freed'], "users_processed": result['users_processed'], "max_files_per_user": files_to_keep } ) return { "success": True, "message": "Cleanup completed successfully", **result } except Exception as e: logger.exception("Failed to trigger cleanup") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to trigger cleanup: {str(e)}" )