#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 使用者資料模型 Author: PANJIT IT Team Created: 2024-01-28 Modified: 2024-01-28 """ from datetime import datetime, timedelta from sqlalchemy.sql import func from app import db from app.utils.timezone import format_taiwan_time class User(db.Model): """使用者資訊表 (dt_users)""" __tablename__ = 'dt_users' id = db.Column(db.Integer, primary_key=True, autoincrement=True) username = db.Column(db.String(100), unique=True, nullable=False, index=True, comment='AD帳號') display_name = db.Column(db.String(200), nullable=False, comment='顯示名稱') email = db.Column(db.String(255), nullable=False, index=True, comment='電子郵件') department = db.Column(db.String(100), comment='部門') is_admin = db.Column(db.Boolean, default=False, comment='是否為管理員') last_login = db.Column(db.DateTime, comment='最後登入時間') created_at = db.Column(db.DateTime, default=func.now(), comment='建立時間') updated_at = db.Column( db.DateTime, default=func.now(), onupdate=func.now(), comment='更新時間' ) # 關聯關係 translation_jobs = db.relationship('TranslationJob', backref='user', lazy='dynamic', cascade='all, delete-orphan') api_usage_stats = db.relationship('APIUsageStats', backref='user', lazy='dynamic', cascade='all, delete-orphan') system_logs = db.relationship('SystemLog', backref='user', lazy='dynamic') def __repr__(self): return f'' def to_dict(self, include_stats=False): """轉換為字典格式""" data = { 'id': self.id, 'username': self.username, 'display_name': self.display_name, 'email': self.email, 'department': self.department, 'is_admin': self.is_admin, 'last_login': format_taiwan_time(self.last_login, "%Y-%m-%d %H:%M:%S") if self.last_login else None, 'created_at': format_taiwan_time(self.created_at, "%Y-%m-%d %H:%M:%S") if self.created_at else None, 'updated_at': format_taiwan_time(self.updated_at, "%Y-%m-%d %H:%M:%S") if self.updated_at else None } if include_stats: data.update({ 'total_jobs': self.translation_jobs.count(), 'completed_jobs': self.translation_jobs.filter_by(status='COMPLETED').count(), 'failed_jobs': self.translation_jobs.filter_by(status='FAILED').count(), 'total_cost': self.get_total_cost() }) return data def get_total_cost(self): """計算使用者總成本""" try: from app.models.stats import APIUsageStats return db.session.query( func.sum(APIUsageStats.cost) ).filter(APIUsageStats.user_id == self.id).scalar() or 0.0 except Exception: return 0.0 def update_last_login(self): """更新最後登入時間""" self.last_login = datetime.utcnow() db.session.commit() @classmethod def get_or_create(cls, username, display_name, email, department=None): """取得或建立使用者""" user = cls.query.filter_by(username=username).first() if user: # 更新使用者資訊 user.display_name = display_name user.email = email if department: user.department = department user.updated_at = datetime.utcnow() else: # 建立新使用者 user = cls( username=username, display_name=display_name, email=email, department=department, is_admin=(email.lower() == 'ymirliu@panjit.com.tw') # 硬編碼管理員 ) db.session.add(user) db.session.commit() return user @classmethod def get_admin_users(cls): """取得所有管理員使用者""" return cls.query.filter_by(is_admin=True).all() @classmethod def get_active_users(cls, days=30): """取得活躍使用者(指定天數內有登入)""" cutoff_date = datetime.utcnow() - timedelta(days=days) return cls.query.filter(cls.last_login >= cutoff_date).all()