Files
OCR/backend/app/services/admin_service.py
egg 65abd51d60 feat: add translation billing stats and remove Export/Settings pages
- Add TranslationLog model to track translation API usage per task
- Integrate Dify API actual price (total_price) into translation stats
- Display translation statistics in admin dashboard with per-task costs
- Remove unused Export and Settings pages to simplify frontend
- Add GET /api/v2/admin/translation-stats endpoint

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-12 17:38:12 +08:00

297 lines
9.4 KiB
Python

"""
Tool_OCR - Admin Service
Administrative functions and statistics
"""
import logging
from typing import List, Dict
from sqlalchemy.orm import Session
from sqlalchemy import func, and_
from datetime import datetime, timedelta
from app.models.user import User
from app.models.task import Task, TaskStatus
from app.models.session import Session as UserSession
from app.models.audit_log import AuditLog
from app.models.translation_log import TranslationLog
from app.core.config import settings
logger = logging.getLogger(__name__)
class AdminService:
"""Service for administrative operations"""
# Admin email addresses
ADMIN_EMAILS = ["ymirliu@panjit.com.tw"]
def is_admin(self, email: str) -> bool:
"""
Check if user is an administrator
Args:
email: User email address
Returns:
True if user is admin
"""
return email.lower() in [e.lower() for e in self.ADMIN_EMAILS]
def get_system_statistics(self, db: Session) -> dict:
"""
Get overall system statistics
Args:
db: Database session
Returns:
Dictionary with system stats
"""
# User statistics
total_users = db.query(User).count()
active_users = db.query(User).filter(User.is_active == True).count()
# Count users with logins in last 30 days
date_30_days_ago = datetime.utcnow() - timedelta(days=30)
active_users_30d = db.query(User).filter(
and_(
User.last_login >= date_30_days_ago,
User.is_active == True
)
).count()
# Task statistics
total_tasks = db.query(Task).count()
tasks_by_status = {}
for status in TaskStatus:
count = db.query(Task).filter(Task.status == status).count()
tasks_by_status[status.value] = count
# Session statistics
active_sessions = db.query(UserSession).filter(
UserSession.expires_at > datetime.utcnow()
).count()
# Recent activity (last 7 days)
date_7_days_ago = datetime.utcnow() - timedelta(days=7)
recent_tasks = db.query(Task).filter(
Task.created_at >= date_7_days_ago
).count()
recent_logins = db.query(AuditLog).filter(
and_(
AuditLog.event_type == "auth_login",
AuditLog.created_at >= date_7_days_ago,
AuditLog.success == 1
)
).count()
return {
"total_users": total_users,
"active_users": active_users,
"total_tasks": total_tasks,
"total_sessions": active_sessions,
"recent_activity_count": recent_tasks,
"task_stats": {
"pending": tasks_by_status.get("pending", 0),
"processing": tasks_by_status.get("processing", 0),
"completed": tasks_by_status.get("completed", 0),
"failed": tasks_by_status.get("failed", 0)
}
}
def get_user_list(
self,
db: Session,
skip: int = 0,
limit: int = 50
) -> tuple[List[Dict], int]:
"""
Get list of all users with statistics
Args:
db: Database session
skip: Pagination offset
limit: Pagination limit
Returns:
Tuple of (user list, total count)
"""
# Get total count
total = db.query(User).count()
# Get users
users = db.query(User).order_by(User.created_at.desc()).offset(skip).limit(limit).all()
# Enhance with statistics
user_list = []
for user in users:
# Count user's tasks
task_count = db.query(Task).filter(Task.user_id == user.id).count()
# Count completed tasks
completed_tasks = db.query(Task).filter(
and_(
Task.user_id == user.id,
Task.status == TaskStatus.COMPLETED
)
).count()
# Count failed tasks
failed_tasks = db.query(Task).filter(
and_(
Task.user_id == user.id,
Task.status == TaskStatus.FAILED
)
).count()
# Count active sessions
active_sessions = db.query(UserSession).filter(
and_(
UserSession.user_id == user.id,
UserSession.expires_at > datetime.utcnow()
)
).count()
user_list.append({
**user.to_dict(),
"task_count": task_count,
"completed_tasks": completed_tasks,
"failed_tasks": failed_tasks,
"active_sessions": active_sessions,
"is_admin": self.is_admin(user.email)
})
return user_list, total
def get_top_users(
self,
db: Session,
metric: str = "tasks",
limit: int = 10
) -> List[Dict]:
"""
Get top users by metric
Args:
db: Database session
metric: Metric to rank by (tasks, completed_tasks)
limit: Number of users to return
Returns:
List of top users with counts
"""
# Get top users by total tasks
results = db.query(
User,
func.count(Task.id).label("task_count")
).join(Task).group_by(User.id).order_by(
func.count(Task.id).desc()
).limit(limit).all()
# Build result list with both task_count and completed_tasks
top_users = []
for user, task_count in results:
# Count completed tasks for this user
completed_tasks = db.query(Task).filter(
and_(
Task.user_id == user.id,
Task.status == TaskStatus.COMPLETED
)
).count()
top_users.append({
"user_id": user.id,
"email": user.email,
"display_name": user.display_name,
"task_count": task_count,
"completed_tasks": completed_tasks
})
return top_users
def get_translation_statistics(self, db: Session) -> dict:
"""
Get translation usage statistics for admin dashboard.
Args:
db: Database session
Returns:
Dictionary with translation stats including total tokens, costs, and breakdowns
"""
# Total translation count
total_translations = db.query(TranslationLog).count()
# Sum of tokens
token_stats = db.query(
func.sum(TranslationLog.total_tokens).label("total_tokens"),
func.sum(TranslationLog.input_tokens).label("total_input_tokens"),
func.sum(TranslationLog.output_tokens).label("total_output_tokens"),
func.sum(TranslationLog.total_characters).label("total_characters"),
func.sum(TranslationLog.estimated_cost).label("total_cost")
).first()
# Breakdown by target language
by_language = db.query(
TranslationLog.target_lang,
func.count(TranslationLog.id).label("count"),
func.sum(TranslationLog.total_tokens).label("tokens"),
func.sum(TranslationLog.total_characters).label("characters")
).group_by(TranslationLog.target_lang).all()
language_breakdown = [
{
"language": lang,
"count": count,
"tokens": tokens or 0,
"characters": chars or 0
}
for lang, count, tokens, chars in by_language
]
# Recent translations (last 20)
recent = db.query(TranslationLog).order_by(
TranslationLog.created_at.desc()
).limit(20).all()
recent_translations = [
{
"id": log.id,
"task_id": log.task_id,
"target_lang": log.target_lang,
"total_tokens": log.total_tokens,
"total_characters": log.total_characters,
"processing_time_seconds": log.processing_time_seconds,
"estimated_cost": log.estimated_cost,
"created_at": log.created_at.isoformat() if log.created_at else None
}
for log in recent
]
# Stats for last 30 days
date_30_days_ago = datetime.utcnow() - timedelta(days=30)
recent_stats = db.query(
func.count(TranslationLog.id).label("count"),
func.sum(TranslationLog.total_tokens).label("tokens")
).filter(TranslationLog.created_at >= date_30_days_ago).first()
return {
"total_translations": total_translations,
"total_tokens": token_stats.total_tokens or 0,
"total_input_tokens": token_stats.total_input_tokens or 0,
"total_output_tokens": token_stats.total_output_tokens or 0,
"total_characters": token_stats.total_characters or 0,
"estimated_cost": token_stats.total_cost or 0.0,
"by_language": language_breakdown,
"recent_translations": recent_translations,
"last_30_days": {
"count": recent_stats.count or 0,
"tokens": recent_stats.tokens or 0
}
}
# Singleton instance
admin_service = AdminService()