Files
OCR/backend/app/services/admin_service.py
egg fd98018ddd refactor: complete V1 to V2 migration and remove legacy architecture
Remove all V1 architecture components and promote V2 to primary:
- Delete all paddle_ocr_* table models (export, ocr, translation, user)
- Delete legacy routers (auth, export, ocr, translation)
- Delete legacy schemas and services
- Promote user_v2.py to user.py as primary user model
- Update all imports and dependencies to use V2 models only
- Update main.py version to 2.0.0

Database changes:
- Fix SQLAlchemy reserved word: rename audit_log.metadata to extra_data
- Add migration to drop all paddle_ocr_* tables
- Update alembic env to only import V2 models

Frontend fixes:
- Fix Select component exports in TaskHistoryPage.tsx
- Update to use simplified Select API with options prop
- Fix AxiosInstance TypeScript import syntax

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-14 21:27:39 +08:00

212 lines
5.9 KiB
Python

"""
Tool_OCR - Admin Service
Administrative functions and statistics
"""
import logging
from typing import List, Dict
from sqlalchemy.orm import Session
from sqlalchemy import func, and_
from datetime import datetime, timedelta
from app.models.user import User
from app.models.task import Task, TaskStatus
from app.models.session import Session as UserSession
from app.models.audit_log import AuditLog
from app.core.config import settings
logger = logging.getLogger(__name__)
class AdminService:
"""Service for administrative operations"""
# Admin email addresses
ADMIN_EMAILS = ["ymirliu@panjit.com.tw"]
def is_admin(self, email: str) -> bool:
"""
Check if user is an administrator
Args:
email: User email address
Returns:
True if user is admin
"""
return email.lower() in [e.lower() for e in self.ADMIN_EMAILS]
def get_system_statistics(self, db: Session) -> dict:
"""
Get overall system statistics
Args:
db: Database session
Returns:
Dictionary with system stats
"""
# User statistics
total_users = db.query(User).count()
active_users = db.query(User).filter(User.is_active == True).count()
# Count users with logins in last 30 days
date_30_days_ago = datetime.utcnow() - timedelta(days=30)
active_users_30d = db.query(User).filter(
and_(
User.last_login >= date_30_days_ago,
User.is_active == True
)
).count()
# Task statistics
total_tasks = db.query(Task).count()
tasks_by_status = {}
for status in TaskStatus:
count = db.query(Task).filter(Task.status == status).count()
tasks_by_status[status.value] = count
# Session statistics
active_sessions = db.query(UserSession).filter(
UserSession.expires_at > datetime.utcnow()
).count()
# Recent activity (last 7 days)
date_7_days_ago = datetime.utcnow() - timedelta(days=7)
recent_tasks = db.query(Task).filter(
Task.created_at >= date_7_days_ago
).count()
recent_logins = db.query(AuditLog).filter(
and_(
AuditLog.event_type == "auth_login",
AuditLog.created_at >= date_7_days_ago,
AuditLog.success == 1
)
).count()
return {
"users": {
"total": total_users,
"active": active_users,
"active_30d": active_users_30d
},
"tasks": {
"total": total_tasks,
"by_status": tasks_by_status,
"recent_7d": recent_tasks
},
"sessions": {
"active": active_sessions
},
"activity": {
"logins_7d": recent_logins,
"tasks_7d": recent_tasks
}
}
def get_user_list(
self,
db: Session,
skip: int = 0,
limit: int = 50
) -> tuple[List[Dict], int]:
"""
Get list of all users with statistics
Args:
db: Database session
skip: Pagination offset
limit: Pagination limit
Returns:
Tuple of (user list, total count)
"""
# Get total count
total = db.query(User).count()
# Get users
users = db.query(User).order_by(User.created_at.desc()).offset(skip).limit(limit).all()
# Enhance with statistics
user_list = []
for user in users:
# Count user's tasks
task_count = db.query(Task).filter(Task.user_id == user.id).count()
# Count completed tasks
completed_tasks = db.query(Task).filter(
and_(
Task.user_id == user.id,
Task.status == TaskStatus.COMPLETED
)
).count()
# Count active sessions
active_sessions = db.query(UserSession).filter(
and_(
UserSession.user_id == user.id,
UserSession.expires_at > datetime.utcnow()
)
).count()
user_list.append({
**user.to_dict(),
"total_tasks": task_count,
"completed_tasks": completed_tasks,
"active_sessions": active_sessions,
"is_admin": self.is_admin(user.email)
})
return user_list, total
def get_top_users(
self,
db: Session,
metric: str = "tasks",
limit: int = 10
) -> List[Dict]:
"""
Get top users by metric
Args:
db: Database session
metric: Metric to rank by (tasks, completed_tasks)
limit: Number of users to return
Returns:
List of top users with counts
"""
if metric == "completed_tasks":
# Top users by completed tasks
results = db.query(
User,
func.count(Task.id).label("task_count")
).join(Task).filter(
Task.status == TaskStatus.COMPLETED
).group_by(User.id).order_by(
func.count(Task.id).desc()
).limit(limit).all()
else:
# Top users by total tasks (default)
results = db.query(
User,
func.count(Task.id).label("task_count")
).join(Task).group_by(User.id).order_by(
func.count(Task.id).desc()
).limit(limit).all()
return [
{
"user_id": user.id,
"email": user.email,
"display_name": user.display_name,
"count": count
}
for user, count in results
]
# Singleton instance
admin_service = AdminService()