Remove all V1 architecture components and promote V2 to primary: - Delete all paddle_ocr_* table models (export, ocr, translation, user) - Delete legacy routers (auth, export, ocr, translation) - Delete legacy schemas and services - Promote user_v2.py to user.py as primary user model - Update all imports and dependencies to use V2 models only - Update main.py version to 2.0.0 Database changes: - Fix SQLAlchemy reserved word: rename audit_log.metadata to extra_data - Add migration to drop all paddle_ocr_* tables - Update alembic env to only import V2 models Frontend fixes: - Fix Select component exports in TaskHistoryPage.tsx - Update to use simplified Select API with options prop - Fix AxiosInstance TypeScript import syntax 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
212 lines
5.9 KiB
Python
212 lines
5.9 KiB
Python
"""
|
|
Tool_OCR - Admin Service
|
|
Administrative functions and statistics
|
|
"""
|
|
|
|
import logging
|
|
from typing import List, Dict
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import func, and_
|
|
from datetime import datetime, timedelta
|
|
|
|
from app.models.user import User
|
|
from app.models.task import Task, TaskStatus
|
|
from app.models.session import Session as UserSession
|
|
from app.models.audit_log import AuditLog
|
|
from app.core.config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AdminService:
|
|
"""Service for administrative operations"""
|
|
|
|
# Admin email addresses
|
|
ADMIN_EMAILS = ["ymirliu@panjit.com.tw"]
|
|
|
|
def is_admin(self, email: str) -> bool:
|
|
"""
|
|
Check if user is an administrator
|
|
|
|
Args:
|
|
email: User email address
|
|
|
|
Returns:
|
|
True if user is admin
|
|
"""
|
|
return email.lower() in [e.lower() for e in self.ADMIN_EMAILS]
|
|
|
|
def get_system_statistics(self, db: Session) -> dict:
|
|
"""
|
|
Get overall system statistics
|
|
|
|
Args:
|
|
db: Database session
|
|
|
|
Returns:
|
|
Dictionary with system stats
|
|
"""
|
|
# User statistics
|
|
total_users = db.query(User).count()
|
|
active_users = db.query(User).filter(User.is_active == True).count()
|
|
|
|
# Count users with logins in last 30 days
|
|
date_30_days_ago = datetime.utcnow() - timedelta(days=30)
|
|
active_users_30d = db.query(User).filter(
|
|
and_(
|
|
User.last_login >= date_30_days_ago,
|
|
User.is_active == True
|
|
)
|
|
).count()
|
|
|
|
# Task statistics
|
|
total_tasks = db.query(Task).count()
|
|
tasks_by_status = {}
|
|
for status in TaskStatus:
|
|
count = db.query(Task).filter(Task.status == status).count()
|
|
tasks_by_status[status.value] = count
|
|
|
|
# Session statistics
|
|
active_sessions = db.query(UserSession).filter(
|
|
UserSession.expires_at > datetime.utcnow()
|
|
).count()
|
|
|
|
# Recent activity (last 7 days)
|
|
date_7_days_ago = datetime.utcnow() - timedelta(days=7)
|
|
recent_tasks = db.query(Task).filter(
|
|
Task.created_at >= date_7_days_ago
|
|
).count()
|
|
|
|
recent_logins = db.query(AuditLog).filter(
|
|
and_(
|
|
AuditLog.event_type == "auth_login",
|
|
AuditLog.created_at >= date_7_days_ago,
|
|
AuditLog.success == 1
|
|
)
|
|
).count()
|
|
|
|
return {
|
|
"users": {
|
|
"total": total_users,
|
|
"active": active_users,
|
|
"active_30d": active_users_30d
|
|
},
|
|
"tasks": {
|
|
"total": total_tasks,
|
|
"by_status": tasks_by_status,
|
|
"recent_7d": recent_tasks
|
|
},
|
|
"sessions": {
|
|
"active": active_sessions
|
|
},
|
|
"activity": {
|
|
"logins_7d": recent_logins,
|
|
"tasks_7d": recent_tasks
|
|
}
|
|
}
|
|
|
|
def get_user_list(
|
|
self,
|
|
db: Session,
|
|
skip: int = 0,
|
|
limit: int = 50
|
|
) -> tuple[List[Dict], int]:
|
|
"""
|
|
Get list of all users with statistics
|
|
|
|
Args:
|
|
db: Database session
|
|
skip: Pagination offset
|
|
limit: Pagination limit
|
|
|
|
Returns:
|
|
Tuple of (user list, total count)
|
|
"""
|
|
# Get total count
|
|
total = db.query(User).count()
|
|
|
|
# Get users
|
|
users = db.query(User).order_by(User.created_at.desc()).offset(skip).limit(limit).all()
|
|
|
|
# Enhance with statistics
|
|
user_list = []
|
|
for user in users:
|
|
# Count user's tasks
|
|
task_count = db.query(Task).filter(Task.user_id == user.id).count()
|
|
|
|
# Count completed tasks
|
|
completed_tasks = db.query(Task).filter(
|
|
and_(
|
|
Task.user_id == user.id,
|
|
Task.status == TaskStatus.COMPLETED
|
|
)
|
|
).count()
|
|
|
|
# Count active sessions
|
|
active_sessions = db.query(UserSession).filter(
|
|
and_(
|
|
UserSession.user_id == user.id,
|
|
UserSession.expires_at > datetime.utcnow()
|
|
)
|
|
).count()
|
|
|
|
user_list.append({
|
|
**user.to_dict(),
|
|
"total_tasks": task_count,
|
|
"completed_tasks": completed_tasks,
|
|
"active_sessions": active_sessions,
|
|
"is_admin": self.is_admin(user.email)
|
|
})
|
|
|
|
return user_list, total
|
|
|
|
def get_top_users(
|
|
self,
|
|
db: Session,
|
|
metric: str = "tasks",
|
|
limit: int = 10
|
|
) -> List[Dict]:
|
|
"""
|
|
Get top users by metric
|
|
|
|
Args:
|
|
db: Database session
|
|
metric: Metric to rank by (tasks, completed_tasks)
|
|
limit: Number of users to return
|
|
|
|
Returns:
|
|
List of top users with counts
|
|
"""
|
|
if metric == "completed_tasks":
|
|
# Top users by completed tasks
|
|
results = db.query(
|
|
User,
|
|
func.count(Task.id).label("task_count")
|
|
).join(Task).filter(
|
|
Task.status == TaskStatus.COMPLETED
|
|
).group_by(User.id).order_by(
|
|
func.count(Task.id).desc()
|
|
).limit(limit).all()
|
|
else:
|
|
# Top users by total tasks (default)
|
|
results = db.query(
|
|
User,
|
|
func.count(Task.id).label("task_count")
|
|
).join(Task).group_by(User.id).order_by(
|
|
func.count(Task.id).desc()
|
|
).limit(limit).all()
|
|
|
|
return [
|
|
{
|
|
"user_id": user.id,
|
|
"email": user.email,
|
|
"display_name": user.display_name,
|
|
"count": count
|
|
}
|
|
for user, count in results
|
|
]
|
|
|
|
|
|
# Singleton instance
|
|
admin_service = AdminService()
|