#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 翻譯相關 Celery 任務 Author: PANJIT IT Team Created: 2024-01-28 Modified: 2024-01-28 """ import os import shutil from datetime import datetime, timedelta from pathlib import Path from celery import Celery, current_task from celery.schedules import crontab from app import create_app, db logger = None def get_celery_instance(): """取得 Celery 實例""" app = create_app() return app.celery # 建立 Celery 實例 celery = get_celery_instance() # 初始化 logger from app.utils.logger import get_logger logger = get_logger(__name__) from app.models.job import TranslationJob from app.models.log import SystemLog from app.services.translation_service import TranslationService from app.services.notification_service import NotificationService from app.utils.exceptions import TranslationError @celery.task(bind=True, max_retries=3) def process_translation_job(self, job_id: int): """處理翻譯任務""" app = create_app() with app.app_context(): try: # 取得任務資訊 job = TranslationJob.query.get(job_id) if not job: raise ValueError(f"Job {job_id} not found") logger.info(f"Starting translation job processing: {job.job_uuid}") # 記錄任務開始 SystemLog.info( 'tasks.translation', f'Translation job started: {job.job_uuid}', user_id=job.user_id, job_id=job.id, extra_data={ 'filename': job.original_filename, 'target_languages': job.target_languages, 'retry_count': self.request.retries } ) # 建立翻譯服務 translation_service = TranslationService() # 執行翻譯 result = translation_service.translate_document(job.job_uuid) if result['success']: logger.info(f"Translation job completed successfully: {job.job_uuid}") # 發送完成通知 try: notification_service = NotificationService() notification_service.send_job_completion_notification(job) except Exception as e: logger.warning(f"Failed to send completion notification: {str(e)}") # 記錄完成日誌 SystemLog.info( 'tasks.translation', f'Translation job completed: {job.job_uuid}', user_id=job.user_id, job_id=job.id, extra_data={ 'total_cost': result.get('total_cost', 0), 'total_sentences': result.get('total_sentences', 0), 'output_files': list(result.get('output_files', {}).keys()) } ) else: raise TranslationError(result.get('error', 'Unknown translation error')) except Exception as exc: logger.error(f"Translation job failed: {job.job_uuid}. Error: {str(exc)}") with app.app_context(): # 更新任務狀態 job = TranslationJob.query.get(job_id) if job: job.error_message = str(exc) job.retry_count = self.request.retries + 1 if self.request.retries < self.max_retries: # 準備重試 job.update_status('RETRY') # 計算重試延遲:30s, 60s, 120s countdown = [30, 60, 120][self.request.retries] SystemLog.warning( 'tasks.translation', f'Translation job retry scheduled: {job.job_uuid} (attempt {self.request.retries + 2})', user_id=job.user_id, job_id=job.id, extra_data={ 'error': str(exc), 'retry_count': self.request.retries + 1, 'countdown': countdown } ) logger.info(f"Retrying translation job in {countdown}s: {job.job_uuid}") raise self.retry(exc=exc, countdown=countdown) else: # 重試次數用盡,標記失敗 job.update_status('FAILED') SystemLog.error( 'tasks.translation', f'Translation job failed permanently: {job.job_uuid}', user_id=job.user_id, job_id=job.id, extra_data={ 'error': str(exc), 'total_retries': self.request.retries } ) # 發送失敗通知 try: notification_service = NotificationService() notification_service.send_job_failure_notification(job) except Exception as e: logger.warning(f"Failed to send failure notification: {str(e)}") logger.error(f"Translation job failed permanently: {job.job_uuid}") raise exc @celery.task def cleanup_old_files(): """清理舊檔案(定期任務)""" app = create_app() with app.app_context(): try: logger.info("Starting file cleanup task") upload_folder = Path(app.config.get('UPLOAD_FOLDER')) retention_days = app.config.get('FILE_RETENTION_DAYS', 7) cutoff_date = datetime.utcnow() - timedelta(days=retention_days) if not upload_folder.exists(): logger.warning(f"Upload folder does not exist: {upload_folder}") return deleted_files = 0 deleted_dirs = 0 total_size_freed = 0 # 遍歷上傳目錄中的所有 UUID 目錄 for item in upload_folder.iterdir(): if not item.is_dir(): continue try: # 檢查目錄的修改時間 dir_mtime = datetime.fromtimestamp(item.stat().st_mtime) if dir_mtime < cutoff_date: # 計算目錄大小 dir_size = sum(f.stat().st_size for f in item.rglob('*') if f.is_file()) # 檢查是否還有相關的資料庫記錄 job_uuid = item.name job = TranslationJob.query.filter_by(job_uuid=job_uuid).first() if job: # 檢查任務是否已完成且超過保留期 if job.completed_at and job.completed_at < cutoff_date: # 刪除目錄 shutil.rmtree(item) deleted_dirs += 1 total_size_freed += dir_size logger.info(f"Cleaned up job directory: {job_uuid}") # 記錄清理日誌 SystemLog.info( 'tasks.cleanup', f'Cleaned up files for completed job: {job_uuid}', user_id=job.user_id, job_id=job.id, extra_data={ 'files_size_mb': dir_size / (1024 * 1024), 'retention_days': retention_days } ) else: # 沒有對應的資料庫記錄,直接刪除 shutil.rmtree(item) deleted_dirs += 1 total_size_freed += dir_size logger.info(f"Cleaned up orphaned directory: {job_uuid}") except Exception as e: logger.error(f"Failed to process directory {item}: {str(e)}") continue # 記錄清理結果 cleanup_result = { 'deleted_directories': deleted_dirs, 'total_size_freed_mb': total_size_freed / (1024 * 1024), 'retention_days': retention_days, 'cutoff_date': cutoff_date.isoformat() } SystemLog.info( 'tasks.cleanup', f'File cleanup completed: {deleted_dirs} directories, {total_size_freed / (1024 * 1024):.2f} MB freed', extra_data=cleanup_result ) logger.info(f"File cleanup completed: {cleanup_result}") return cleanup_result except Exception as e: logger.error(f"File cleanup task failed: {str(e)}") SystemLog.error( 'tasks.cleanup', f'File cleanup task failed: {str(e)}', extra_data={'error': str(e)} ) raise e @celery.task def send_daily_admin_report(): """發送每日管理員報告""" app = create_app() with app.app_context(): try: logger.info("Generating daily admin report") from app.models.stats import APIUsageStats from app.services.notification_service import NotificationService # 取得昨日統計 yesterday = datetime.utcnow() - timedelta(days=1) daily_stats = APIUsageStats.get_daily_statistics(days=1) # 取得系統錯誤摘要 error_summary = SystemLog.get_error_summary(days=1) # 準備報告內容 if daily_stats: yesterday_data = daily_stats[0] subject = f"每日系統報告 - {yesterday_data['date']}" message = f""" 昨日系統使用狀況: • 翻譯任務: {yesterday_data['total_calls']} 個 • 成功任務: {yesterday_data['successful_calls']} 個 • 失敗任務: {yesterday_data['failed_calls']} 個 • 總成本: ${yesterday_data['total_cost']:.4f} • 總 Token 數: {yesterday_data['total_tokens']} 系統錯誤摘要: • 錯誤數量: {error_summary['total_errors']} 請查看管理後台了解詳細資訊。 """ else: subject = f"每日系統報告 - {yesterday.strftime('%Y-%m-%d')}" message = "昨日無翻譯任務記錄。" # 發送管理員通知 notification_service = NotificationService() result = notification_service.send_admin_notification(subject, message) if result: logger.info("Daily admin report sent successfully") else: logger.warning("Failed to send daily admin report") return result except Exception as e: logger.error(f"Daily admin report task failed: {str(e)}") raise e # 定期任務設定 @celery.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): """設定定期任務""" # 每日凌晨 2 點執行檔案清理 sender.add_periodic_task( crontab(hour=2, minute=0), cleanup_old_files.s(), name='cleanup-old-files-daily' ) # 每日早上 8 點發送管理員報告 sender.add_periodic_task( crontab(hour=8, minute=0), send_daily_admin_report.s(), name='daily-admin-report' )