- Add TranslationLog model to track translation API usage per task - Integrate Dify API actual price (total_price) into translation stats - Display translation statistics in admin dashboard with per-task costs - Remove unused Export and Settings pages to simplify frontend - Add GET /api/v2/admin/translation-stats endpoint 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
297 lines
9.4 KiB
Python
297 lines
9.4 KiB
Python
"""
|
|
Tool_OCR - Admin Service
|
|
Administrative functions and statistics
|
|
"""
|
|
|
|
import logging
|
|
from typing import List, Dict
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import func, and_
|
|
from datetime import datetime, timedelta
|
|
|
|
from app.models.user import User
|
|
from app.models.task import Task, TaskStatus
|
|
from app.models.session import Session as UserSession
|
|
from app.models.audit_log import AuditLog
|
|
from app.models.translation_log import TranslationLog
|
|
from app.core.config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AdminService:
|
|
"""Service for administrative operations"""
|
|
|
|
# Admin email addresses
|
|
ADMIN_EMAILS = ["ymirliu@panjit.com.tw"]
|
|
|
|
def is_admin(self, email: str) -> bool:
|
|
"""
|
|
Check if user is an administrator
|
|
|
|
Args:
|
|
email: User email address
|
|
|
|
Returns:
|
|
True if user is admin
|
|
"""
|
|
return email.lower() in [e.lower() for e in self.ADMIN_EMAILS]
|
|
|
|
def get_system_statistics(self, db: Session) -> dict:
|
|
"""
|
|
Get overall system statistics
|
|
|
|
Args:
|
|
db: Database session
|
|
|
|
Returns:
|
|
Dictionary with system stats
|
|
"""
|
|
# User statistics
|
|
total_users = db.query(User).count()
|
|
active_users = db.query(User).filter(User.is_active == True).count()
|
|
|
|
# Count users with logins in last 30 days
|
|
date_30_days_ago = datetime.utcnow() - timedelta(days=30)
|
|
active_users_30d = db.query(User).filter(
|
|
and_(
|
|
User.last_login >= date_30_days_ago,
|
|
User.is_active == True
|
|
)
|
|
).count()
|
|
|
|
# Task statistics
|
|
total_tasks = db.query(Task).count()
|
|
tasks_by_status = {}
|
|
for status in TaskStatus:
|
|
count = db.query(Task).filter(Task.status == status).count()
|
|
tasks_by_status[status.value] = count
|
|
|
|
# Session statistics
|
|
active_sessions = db.query(UserSession).filter(
|
|
UserSession.expires_at > datetime.utcnow()
|
|
).count()
|
|
|
|
# Recent activity (last 7 days)
|
|
date_7_days_ago = datetime.utcnow() - timedelta(days=7)
|
|
recent_tasks = db.query(Task).filter(
|
|
Task.created_at >= date_7_days_ago
|
|
).count()
|
|
|
|
recent_logins = db.query(AuditLog).filter(
|
|
and_(
|
|
AuditLog.event_type == "auth_login",
|
|
AuditLog.created_at >= date_7_days_ago,
|
|
AuditLog.success == 1
|
|
)
|
|
).count()
|
|
|
|
return {
|
|
"total_users": total_users,
|
|
"active_users": active_users,
|
|
"total_tasks": total_tasks,
|
|
"total_sessions": active_sessions,
|
|
"recent_activity_count": recent_tasks,
|
|
"task_stats": {
|
|
"pending": tasks_by_status.get("pending", 0),
|
|
"processing": tasks_by_status.get("processing", 0),
|
|
"completed": tasks_by_status.get("completed", 0),
|
|
"failed": tasks_by_status.get("failed", 0)
|
|
}
|
|
}
|
|
|
|
def get_user_list(
|
|
self,
|
|
db: Session,
|
|
skip: int = 0,
|
|
limit: int = 50
|
|
) -> tuple[List[Dict], int]:
|
|
"""
|
|
Get list of all users with statistics
|
|
|
|
Args:
|
|
db: Database session
|
|
skip: Pagination offset
|
|
limit: Pagination limit
|
|
|
|
Returns:
|
|
Tuple of (user list, total count)
|
|
"""
|
|
# Get total count
|
|
total = db.query(User).count()
|
|
|
|
# Get users
|
|
users = db.query(User).order_by(User.created_at.desc()).offset(skip).limit(limit).all()
|
|
|
|
# Enhance with statistics
|
|
user_list = []
|
|
for user in users:
|
|
# Count user's tasks
|
|
task_count = db.query(Task).filter(Task.user_id == user.id).count()
|
|
|
|
# Count completed tasks
|
|
completed_tasks = db.query(Task).filter(
|
|
and_(
|
|
Task.user_id == user.id,
|
|
Task.status == TaskStatus.COMPLETED
|
|
)
|
|
).count()
|
|
|
|
# Count failed tasks
|
|
failed_tasks = db.query(Task).filter(
|
|
and_(
|
|
Task.user_id == user.id,
|
|
Task.status == TaskStatus.FAILED
|
|
)
|
|
).count()
|
|
|
|
# Count active sessions
|
|
active_sessions = db.query(UserSession).filter(
|
|
and_(
|
|
UserSession.user_id == user.id,
|
|
UserSession.expires_at > datetime.utcnow()
|
|
)
|
|
).count()
|
|
|
|
user_list.append({
|
|
**user.to_dict(),
|
|
"task_count": task_count,
|
|
"completed_tasks": completed_tasks,
|
|
"failed_tasks": failed_tasks,
|
|
"active_sessions": active_sessions,
|
|
"is_admin": self.is_admin(user.email)
|
|
})
|
|
|
|
return user_list, total
|
|
|
|
def get_top_users(
|
|
self,
|
|
db: Session,
|
|
metric: str = "tasks",
|
|
limit: int = 10
|
|
) -> List[Dict]:
|
|
"""
|
|
Get top users by metric
|
|
|
|
Args:
|
|
db: Database session
|
|
metric: Metric to rank by (tasks, completed_tasks)
|
|
limit: Number of users to return
|
|
|
|
Returns:
|
|
List of top users with counts
|
|
"""
|
|
# Get top users by total tasks
|
|
results = db.query(
|
|
User,
|
|
func.count(Task.id).label("task_count")
|
|
).join(Task).group_by(User.id).order_by(
|
|
func.count(Task.id).desc()
|
|
).limit(limit).all()
|
|
|
|
# Build result list with both task_count and completed_tasks
|
|
top_users = []
|
|
for user, task_count in results:
|
|
# Count completed tasks for this user
|
|
completed_tasks = db.query(Task).filter(
|
|
and_(
|
|
Task.user_id == user.id,
|
|
Task.status == TaskStatus.COMPLETED
|
|
)
|
|
).count()
|
|
|
|
top_users.append({
|
|
"user_id": user.id,
|
|
"email": user.email,
|
|
"display_name": user.display_name,
|
|
"task_count": task_count,
|
|
"completed_tasks": completed_tasks
|
|
})
|
|
|
|
return top_users
|
|
|
|
def get_translation_statistics(self, db: Session) -> dict:
|
|
"""
|
|
Get translation usage statistics for admin dashboard.
|
|
|
|
Args:
|
|
db: Database session
|
|
|
|
Returns:
|
|
Dictionary with translation stats including total tokens, costs, and breakdowns
|
|
"""
|
|
# Total translation count
|
|
total_translations = db.query(TranslationLog).count()
|
|
|
|
# Sum of tokens
|
|
token_stats = db.query(
|
|
func.sum(TranslationLog.total_tokens).label("total_tokens"),
|
|
func.sum(TranslationLog.input_tokens).label("total_input_tokens"),
|
|
func.sum(TranslationLog.output_tokens).label("total_output_tokens"),
|
|
func.sum(TranslationLog.total_characters).label("total_characters"),
|
|
func.sum(TranslationLog.estimated_cost).label("total_cost")
|
|
).first()
|
|
|
|
# Breakdown by target language
|
|
by_language = db.query(
|
|
TranslationLog.target_lang,
|
|
func.count(TranslationLog.id).label("count"),
|
|
func.sum(TranslationLog.total_tokens).label("tokens"),
|
|
func.sum(TranslationLog.total_characters).label("characters")
|
|
).group_by(TranslationLog.target_lang).all()
|
|
|
|
language_breakdown = [
|
|
{
|
|
"language": lang,
|
|
"count": count,
|
|
"tokens": tokens or 0,
|
|
"characters": chars or 0
|
|
}
|
|
for lang, count, tokens, chars in by_language
|
|
]
|
|
|
|
# Recent translations (last 20)
|
|
recent = db.query(TranslationLog).order_by(
|
|
TranslationLog.created_at.desc()
|
|
).limit(20).all()
|
|
|
|
recent_translations = [
|
|
{
|
|
"id": log.id,
|
|
"task_id": log.task_id,
|
|
"target_lang": log.target_lang,
|
|
"total_tokens": log.total_tokens,
|
|
"total_characters": log.total_characters,
|
|
"processing_time_seconds": log.processing_time_seconds,
|
|
"estimated_cost": log.estimated_cost,
|
|
"created_at": log.created_at.isoformat() if log.created_at else None
|
|
}
|
|
for log in recent
|
|
]
|
|
|
|
# Stats for last 30 days
|
|
date_30_days_ago = datetime.utcnow() - timedelta(days=30)
|
|
recent_stats = db.query(
|
|
func.count(TranslationLog.id).label("count"),
|
|
func.sum(TranslationLog.total_tokens).label("tokens")
|
|
).filter(TranslationLog.created_at >= date_30_days_ago).first()
|
|
|
|
return {
|
|
"total_translations": total_translations,
|
|
"total_tokens": token_stats.total_tokens or 0,
|
|
"total_input_tokens": token_stats.total_input_tokens or 0,
|
|
"total_output_tokens": token_stats.total_output_tokens or 0,
|
|
"total_characters": token_stats.total_characters or 0,
|
|
"estimated_cost": token_stats.total_cost or 0.0,
|
|
"by_language": language_breakdown,
|
|
"recent_translations": recent_translations,
|
|
"last_30_days": {
|
|
"count": recent_stats.count or 0,
|
|
"tokens": recent_stats.tokens or 0
|
|
}
|
|
}
|
|
|
|
|
|
# Singleton instance
|
|
admin_service = AdminService()
|