#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 管理員 API Author: PANJIT IT Team Created: 2024-01-28 Modified: 2024-01-28 """ from datetime import datetime, timedelta from flask import Blueprint, request, jsonify, g, send_file from app.utils.decorators import admin_required from app.utils.validators import validate_pagination, validate_date_range from app.utils.helpers import create_response from app.utils.exceptions import ValidationError from app.utils.logger import get_logger from app.models.user import User from app.models.job import TranslationJob from app.models.stats import APIUsageStats from app.utils.timezone import format_taiwan_time from app.models.log import SystemLog from app.models.cache import TranslationCache from sqlalchemy import func, desc admin_bp = Blueprint('admin', __name__, url_prefix='/admin') logger = get_logger(__name__) @admin_bp.route('/stats', methods=['GET']) @admin_required def get_system_stats(): """取得系統統計資料(簡化版本)""" try: from app import db # 基本統計 - 計算實際的總成本和今日活躍用戶 total_cost = db.session.query(func.sum(TranslationJob.total_cost)).scalar() or 0.0 # 計算今日活躍用戶 (今天有任務活動的用戶) today = datetime.utcnow().date() active_users_today = db.session.query(TranslationJob.user_id).filter( func.date(TranslationJob.created_at) == today ).distinct().count() overview = { 'total_jobs': TranslationJob.query.count(), 'completed_jobs': TranslationJob.query.filter_by(status='COMPLETED').count(), 'failed_jobs': TranslationJob.query.filter_by(status='FAILED').count(), 'pending_jobs': TranslationJob.query.filter_by(status='PENDING').count(), 'processing_jobs': TranslationJob.query.filter_by(status='PROCESSING').count(), 'total_users': User.query.count(), 'active_users_today': active_users_today, 'total_cost': float(total_cost) } # 用戶排行榜 - 按任務數和成本排序 user_rankings = db.session.query( User.id, User.display_name, func.count(TranslationJob.id).label('job_count'), func.sum(TranslationJob.total_cost).label('total_cost') ).outerjoin(TranslationJob).group_by( User.id, User.display_name ).order_by( func.count(TranslationJob.id).desc() ).limit(10).all() user_rankings_data = [] for ranking in user_rankings: user_rankings_data.append({ 'user_id': ranking.id, 'display_name': ranking.display_name, 'job_count': ranking.job_count or 0, 'total_cost': float(ranking.total_cost or 0.0) }) # 簡化的每日統計 - 只返回空數組 daily_stats = [] return jsonify(create_response( success=True, data={ 'overview': overview, 'daily_stats': daily_stats, 'user_rankings': user_rankings_data, 'period': 'month', 'start_date': format_taiwan_time(datetime.utcnow(), "%Y-%m-%d %H:%M:%S"), 'end_date': format_taiwan_time(datetime.utcnow(), "%Y-%m-%d %H:%M:%S") } )) except Exception as e: logger.error(f"Get system stats error: {str(e)}") import traceback logger.error(f"Traceback: {traceback.format_exc()}") return jsonify(create_response( success=False, error='SYSTEM_ERROR', message='取得系統統計失敗' )), 500 @admin_bp.route('/jobs', methods=['GET']) @admin_required def get_all_jobs(): """取得所有使用者任務""" try: # 取得查詢參數 page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 50, type=int) user_id = request.args.get('user_id', type=int) status = request.args.get('status') # 驗證分頁參數 page, per_page = validate_pagination(page, min(per_page, 100)) # 建立查詢 query = TranslationJob.query # 使用者篩選 if user_id: query = query.filter_by(user_id=user_id) # 狀態篩選 if status and status != 'all': valid_statuses = ['PENDING', 'PROCESSING', 'COMPLETED', 'FAILED', 'RETRY'] if status.upper() in valid_statuses: query = query.filter_by(status=status.upper()) # 排序 query = query.order_by(TranslationJob.created_at.desc()) # 分頁 pagination = query.paginate( page=page, per_page=per_page, error_out=False ) jobs = pagination.items # 組合回應資料(包含使用者資訊) jobs_data = [] for job in jobs: job_data = job.to_dict() job_data['user'] = { 'id': job.user.id, 'username': job.user.username, 'display_name': job.user.display_name, 'email': job.user.email } jobs_data.append(job_data) return jsonify(create_response( success=True, data={ 'jobs': jobs_data, 'pagination': { 'page': page, 'per_page': per_page, 'total': pagination.total, 'pages': pagination.pages, 'has_prev': pagination.has_prev, 'has_next': pagination.has_next } } )) except ValidationError as e: return jsonify(create_response( success=False, error=e.error_code, message=str(e) )), 400 except Exception as e: logger.error(f"Get all jobs error: {str(e)}") return jsonify(create_response( success=False, error='SYSTEM_ERROR', message='取得任務列表失敗' )), 500 @admin_bp.route('/users', methods=['GET']) @admin_required def get_all_users(): """取得所有使用者(簡化版本)""" try: # 簡化版本 - 不使用分頁,直接返回所有用戶 users = User.query.order_by(User.created_at.desc()).limit(50).all() users_data = [] for user in users: # 直接構建基本用戶資料,不使用to_dict方法 users_data.append({ 'id': user.id, 'username': user.username, 'display_name': user.display_name, 'email': user.email, 'department': user.department or '', 'is_admin': user.is_admin, 'last_login': user.last_login.isoformat() if user.last_login else None, 'created_at': user.created_at.isoformat() if user.created_at else None, 'updated_at': user.updated_at.isoformat() if user.updated_at else None }) return jsonify(create_response( success=True, data={ 'users': users_data, 'pagination': { 'page': 1, 'per_page': 50, 'total': len(users_data), 'pages': 1, 'has_prev': False, 'has_next': False } } )) except Exception as e: logger.error(f"Get all users error: {str(e)}") import traceback logger.error(f"Traceback: {traceback.format_exc()}") return jsonify(create_response( success=False, error='SYSTEM_ERROR', message='取得使用者列表失敗' )), 500 @admin_bp.route('/logs', methods=['GET']) @admin_required def get_system_logs(): """取得系統日誌""" try: # 取得查詢參數 page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 100, type=int) level = request.args.get('level') module = request.args.get('module') start_date = request.args.get('start_date') end_date = request.args.get('end_date') # 驗證參數 page, per_page = validate_pagination(page, min(per_page, 500)) if start_date or end_date: start_date, end_date = validate_date_range(start_date, end_date) # 取得日誌 logs = SystemLog.get_logs( level=level, module=module, start_date=start_date, end_date=end_date, limit=per_page, offset=(page - 1) * per_page ) # 取得總數(簡化版本,不完全精確) total = len(logs) if len(logs) < per_page else (page * per_page) + 1 logs_data = [log.to_dict() for log in logs] return jsonify(create_response( success=True, data={ 'logs': logs_data, 'pagination': { 'page': page, 'per_page': per_page, 'total': total, 'has_more': len(logs) == per_page } } )) except ValidationError as e: return jsonify(create_response( success=False, error=e.error_code, message=str(e) )), 400 except Exception as e: logger.error(f"Get system logs error: {str(e)}") return jsonify(create_response( success=False, error='SYSTEM_ERROR', message='取得系統日誌失敗' )), 500 @admin_bp.route('/api-usage', methods=['GET']) @admin_required def get_api_usage(): """取得 API 使用統計(簡化版本)""" try: from app import db # 基本統計 total_calls = db.session.query(APIUsageStats).count() total_cost = db.session.query(func.sum(APIUsageStats.cost)).scalar() or 0.0 total_tokens = db.session.query(func.sum(APIUsageStats.total_tokens)).scalar() or 0 # 簡化版本返回基本數據 return jsonify(create_response( success=True, data={ 'daily_stats': [], # 簡化版本 'top_users': [], # 簡化版本 'endpoint_stats': [], # 簡化版本 'cost_trend': [], # 簡化版本 'period_days': 30, 'summary': { 'total_calls': total_calls, 'total_cost': float(total_cost), 'total_tokens': total_tokens } } )) except Exception as e: logger.error(f"Get API usage error: {str(e)}") import traceback logger.error(f"Traceback: {traceback.format_exc()}") return jsonify(create_response( success=False, error='SYSTEM_ERROR', message='取得API使用統計失敗' )), 500 @admin_bp.route('/cache/stats', methods=['GET']) @admin_required def get_cache_stats(): """取得翻譯快取統計""" try: cache_stats = TranslationCache.get_cache_statistics() return jsonify(create_response( success=True, data=cache_stats )) except Exception as e: logger.error(f"Get cache stats error: {str(e)}") return jsonify(create_response( success=False, error='SYSTEM_ERROR', message='取得快取統計失敗' )), 500 @admin_bp.route('/health', methods=['GET']) @admin_required def get_system_health(): """取得系統健康狀態(管理員專用)""" try: from datetime import datetime status = { 'timestamp': format_taiwan_time(datetime.utcnow(), "%Y-%m-%d %H:%M:%S"), 'status': 'healthy', 'services': {} } # 資料庫檢查 try: from app import db db.session.execute('SELECT 1') status['services']['database'] = {'status': 'healthy'} except Exception as e: status['services']['database'] = { 'status': 'unhealthy', 'error': str(e) } status['status'] = 'unhealthy' # 基本統計 try: total_jobs = TranslationJob.query.count() pending_jobs = TranslationJob.query.filter_by(status='PENDING').count() status['services']['translation_service'] = { 'status': 'healthy', 'total_jobs': total_jobs, 'pending_jobs': pending_jobs } except Exception as e: status['services']['translation_service'] = { 'status': 'unhealthy', 'error': str(e) } status['status'] = 'unhealthy' return jsonify(create_response( success=True, data=status )) except Exception as e: logger.error(f"Get system health error: {str(e)}") return jsonify({ 'timestamp': format_taiwan_time(datetime.utcnow(), "%Y-%m-%d %H:%M:%S"), 'status': 'error', 'error': str(e) }), 500 @admin_bp.route('/metrics', methods=['GET']) @admin_required def get_system_metrics(): """取得系統指標(管理員專用)""" try: from datetime import datetime, timedelta from app import db # 統計任務狀態 job_stats = db.session.query( TranslationJob.status, func.count(TranslationJob.id) ).group_by(TranslationJob.status).all() job_counts = {status: count for status, count in job_stats} # 最近24小時的統計 yesterday = datetime.utcnow() - timedelta(days=1) recent_jobs = db.session.query( TranslationJob.status, func.count(TranslationJob.id) ).filter( TranslationJob.created_at >= yesterday ).group_by(TranslationJob.status).all() recent_counts = {status: count for status, count in recent_jobs} metrics_data = { 'timestamp': format_taiwan_time(datetime.utcnow(), "%Y-%m-%d %H:%M:%S"), 'jobs': { 'pending': job_counts.get('PENDING', 0), 'processing': job_counts.get('PROCESSING', 0), 'completed': job_counts.get('COMPLETED', 0), 'failed': job_counts.get('FAILED', 0), 'retry': job_counts.get('RETRY', 0), 'total': sum(job_counts.values()) }, 'recent_24h': { 'pending': recent_counts.get('PENDING', 0), 'processing': recent_counts.get('PROCESSING', 0), 'completed': recent_counts.get('COMPLETED', 0), 'failed': recent_counts.get('FAILED', 0), 'retry': recent_counts.get('RETRY', 0), 'total': sum(recent_counts.values()) } } return jsonify(create_response( success=True, data=metrics_data )) except Exception as e: logger.error(f"Get system metrics error: {str(e)}") return jsonify(create_response( success=False, error='SYSTEM_ERROR', message='取得系統指標失敗' )), 500 @admin_bp.route('/maintenance/cleanup', methods=['POST']) @admin_required def cleanup_system(): """系統清理維護""" try: data = request.get_json() or {} # 清理選項 cleanup_logs = data.get('cleanup_logs', False) cleanup_cache = data.get('cleanup_cache', False) cleanup_files = data.get('cleanup_files', False) logs_days = data.get('logs_days', 30) cache_days = data.get('cache_days', 90) files_days = data.get('files_days', 7) cleanup_results = {} # 清理舊日誌 if cleanup_logs: deleted_logs = SystemLog.cleanup_old_logs(days_to_keep=logs_days) cleanup_results['logs'] = { 'deleted_count': deleted_logs, 'days_kept': logs_days } # 清理舊快取 if cleanup_cache: deleted_cache = TranslationCache.clear_old_cache(days_to_keep=cache_days) cleanup_results['cache'] = { 'deleted_count': deleted_cache, 'days_kept': cache_days } # 清理舊檔案 if cleanup_files: try: from datetime import datetime, timedelta import os from pathlib import Path # 找到超過指定天數的已完成或失敗任務 cutoff_date = datetime.utcnow() - timedelta(days=files_days) old_jobs = TranslationJob.query.filter( TranslationJob.created_at < cutoff_date, TranslationJob.status.in_(['COMPLETED', 'FAILED']) ).all() deleted_files_count = 0 for job in old_jobs: try: # 刪除與任務相關的所有檔案 for file_record in job.files: file_path = Path(file_record.file_path) if file_path.exists(): os.remove(file_path) deleted_files_count += 1 # 也刪除任務目錄 if job.file_path: job_dir = Path(job.file_path).parent if job_dir.exists() and len(list(job_dir.iterdir())) == 0: job_dir.rmdir() except Exception as file_error: logger.warning(f"Failed to cleanup files for job {job.job_uuid}: {file_error}") cleanup_results['files'] = { 'deleted_count': deleted_files_count, 'jobs_processed': len(old_jobs), 'days_kept': files_days } except Exception as cleanup_error: cleanup_results['files'] = { 'error': f'File cleanup failed: {str(cleanup_error)}', 'days_kept': files_days } # 記錄維護日誌 SystemLog.info( 'admin.maintenance', f'System cleanup performed by {g.current_user.username}', user_id=g.current_user.id, extra_data={ 'cleanup_options': data, 'results': cleanup_results } ) logger.info(f"System cleanup performed by {g.current_user.username}") return jsonify(create_response( success=True, data=cleanup_results, message='系統清理完成' )) except Exception as e: logger.error(f"System cleanup error: {str(e)}") return jsonify(create_response( success=False, error='SYSTEM_ERROR', message='系統清理失敗' )), 500 @admin_bp.route('/export/', methods=['GET']) @admin_required def export_report(report_type): """匯出報表""" try: from io import BytesIO import pandas as pd from datetime import datetime, timedelta from app import db # 驗證報表類型 valid_types = ['usage', 'cost', 'jobs'] if report_type not in valid_types: return jsonify(create_response( success=False, error='INVALID_REPORT_TYPE', message='無效的報表類型' )), 400 # 取得查詢參數 start_date = request.args.get('start_date') end_date = request.args.get('end_date') # 設定預設時間範圍(最近30天) if not end_date: end_date = datetime.utcnow() else: end_date = datetime.fromisoformat(end_date.replace('Z', '+00:00')) if not start_date: start_date = end_date - timedelta(days=30) else: start_date = datetime.fromisoformat(start_date.replace('Z', '+00:00')) # 生成報表數據 if report_type == 'usage': # 使用統計報表 data = generate_usage_report(start_date, end_date) filename = f'usage_report_{start_date.strftime("%Y%m%d")}_{end_date.strftime("%Y%m%d")}.xlsx' elif report_type == 'cost': # 成本分析報表 data = generate_cost_report(start_date, end_date) filename = f'cost_report_{start_date.strftime("%Y%m%d")}_{end_date.strftime("%Y%m%d")}.xlsx' elif report_type == 'jobs': # 任務清單報表 data = generate_jobs_report(start_date, end_date) filename = f'jobs_report_{start_date.strftime("%Y%m%d")}_{end_date.strftime("%Y%m%d")}.xlsx' # 建立Excel檔案 output = BytesIO() with pd.ExcelWriter(output, engine='openpyxl') as writer: for sheet_name, df in data.items(): df.to_excel(writer, sheet_name=sheet_name, index=False) output.seek(0) # 記錄匯出日誌 SystemLog.info( 'admin.export_report', f'Report exported: {report_type}', user_id=g.current_user.id, extra_data={ 'report_type': report_type, 'start_date': start_date.isoformat(), 'end_date': end_date.isoformat() } ) logger.info(f"Report exported by {g.current_user.username}: {report_type}") # 發送檔案 return send_file( BytesIO(output.getvalue()), mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', as_attachment=True, download_name=filename ) except Exception as e: logger.error(f"Export report error: {str(e)}") return jsonify(create_response( success=False, error='SYSTEM_ERROR', message='匯出報表失敗' )), 500 def generate_usage_report(start_date, end_date): """生成使用統計報表""" import pandas as pd from app import db # 用戶使用統計 user_stats = db.session.query( User.username, User.display_name, User.department, func.count(TranslationJob.id).label('job_count'), func.sum(TranslationJob.total_cost).label('total_cost'), func.sum(TranslationJob.total_tokens).label('total_tokens') ).outerjoin(TranslationJob).filter( TranslationJob.created_at.between(start_date, end_date) ).group_by( User.id, User.username, User.display_name, User.department ).order_by(func.count(TranslationJob.id).desc()).all() user_df = pd.DataFrame([{ '用戶名': stat.username, '顯示名稱': stat.display_name, '部門': stat.department or '', '任務數': stat.job_count or 0, '總成本 ($)': float(stat.total_cost or 0.0), '總Token數': stat.total_tokens or 0 } for stat in user_stats]) # 每日使用統計 daily_stats = db.session.query( func.date(TranslationJob.created_at).label('date'), func.count(TranslationJob.id).label('job_count'), func.sum(TranslationJob.total_cost).label('total_cost'), func.sum(TranslationJob.total_tokens).label('total_tokens') ).filter( TranslationJob.created_at.between(start_date, end_date) ).group_by( func.date(TranslationJob.created_at) ).order_by(func.date(TranslationJob.created_at)).all() daily_df = pd.DataFrame([{ '日期': stat.date.strftime('%Y-%m-%d'), '任務數': stat.job_count, '總成本 ($)': float(stat.total_cost or 0.0), '總Token數': stat.total_tokens or 0 } for stat in daily_stats]) return { '用戶使用統計': user_df, '每日使用統計': daily_df } def generate_cost_report(start_date, end_date): """生成成本分析報表""" import pandas as pd from app import db # 按語言的成本統計 lang_costs = {} jobs = TranslationJob.query.filter( TranslationJob.created_at.between(start_date, end_date), TranslationJob.total_cost.isnot(None) ).all() for job in jobs: for lang in job.target_languages: if lang not in lang_costs: lang_costs[lang] = {'count': 0, 'cost': 0.0, 'tokens': 0} lang_costs[lang]['count'] += 1 lang_costs[lang]['cost'] += float(job.total_cost or 0.0) / len(job.target_languages) lang_costs[lang]['tokens'] += (job.total_tokens or 0) // len(job.target_languages) lang_df = pd.DataFrame([{ '目標語言': lang, '任務數': data['count'], '總成本 ($)': data['cost'], '總Token數': data['tokens'], '平均單次成本 ($)': data['cost'] / data['count'] if data['count'] > 0 else 0 } for lang, data in lang_costs.items()]) # 按檔案類型的成本統計 file_stats = db.session.query( TranslationJob.file_extension, func.count(TranslationJob.id).label('job_count'), func.sum(TranslationJob.total_cost).label('total_cost'), func.sum(TranslationJob.total_tokens).label('total_tokens') ).filter( TranslationJob.created_at.between(start_date, end_date) ).group_by(TranslationJob.file_extension).all() file_df = pd.DataFrame([{ '檔案類型': stat.file_extension, '任務數': stat.job_count, '總成本 ($)': float(stat.total_cost or 0.0), '總Token數': stat.total_tokens or 0, '平均單次成本 ($)': float(stat.total_cost or 0.0) / stat.job_count if stat.job_count > 0 else 0 } for stat in file_stats]) return { '按語言成本分析': lang_df, '按檔案類型成本分析': file_df } def generate_jobs_report(start_date, end_date): """生成任務清單報表""" import pandas as pd from app import db jobs = db.session.query(TranslationJob).filter( TranslationJob.created_at.between(start_date, end_date) ).options(db.joinedload(TranslationJob.user)).order_by( TranslationJob.created_at.desc() ).all() jobs_df = pd.DataFrame([{ '任務ID': job.job_uuid, '用戶名': job.user.username if job.user else '', '顯示名稱': job.user.display_name if job.user else '', '部門': job.user.department if job.user and job.user.department else '', '原始檔案': job.original_filename, '檔案大小': job.file_size, '來源語言': job.source_language, '目標語言': ', '.join(job.target_languages), '狀態': job.status, '總成本 ($)': float(job.total_cost or 0.0), '總Token數': job.total_tokens or 0, '建立時間': job.created_at.strftime('%Y-%m-%d %H:%M:%S'), '完成時間': job.completed_at.strftime('%Y-%m-%d %H:%M:%S') if job.completed_at else '', '錯誤訊息': job.error_message or '' } for job in jobs]) return { '任務清單': jobs_df }