#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Celery任務管理服務 Author: PANJIT IT Team Created: 2025-09-04 """ from celery import Celery from app.utils.logger import get_logger import os logger = get_logger(__name__) def get_celery_app(): """取得Celery應用實例""" try: from celery_app import app as celery_app return celery_app except ImportError: # 如果無法導入,創建一個簡單的Celery實例 broker_url = os.getenv('REDIS_URL', 'redis://localhost:6379/0') celery_app = Celery('translation_worker', broker=broker_url) return celery_app def revoke_task(job_uuid): """ 撤銷指定任務的Celery任務 Args: job_uuid (str): 任務UUID Returns: bool: 撤銷是否成功 """ try: celery_app = get_celery_app() # Celery任務ID通常與job_uuid相同或相關 task_id = f"translate_document_{job_uuid}" # 嘗試撤銷任務 celery_app.control.revoke(task_id, terminate=True, signal='SIGKILL') logger.info(f"Successfully revoked Celery task: {task_id}") return True except Exception as e: logger.error(f"Failed to revoke Celery task for job {job_uuid}: {str(e)}") return False def get_active_tasks(): """ 取得當前活躍的Celery任務 Returns: list: 活躍任務列表 """ try: celery_app = get_celery_app() # 取得活躍任務 inspect = celery_app.control.inspect() active_tasks = inspect.active() if active_tasks: return active_tasks else: return {} except Exception as e: logger.error(f"Failed to get active tasks: {str(e)}") return {} def is_task_active(job_uuid): """ 檢查指定任務是否在Celery中活躍 Args: job_uuid (str): 任務UUID Returns: bool: 任務是否活躍 """ try: active_tasks = get_active_tasks() task_id = f"translate_document_{job_uuid}" # 檢查所有worker的活躍任務 for worker, tasks in active_tasks.items(): for task in tasks: if task.get('id') == task_id: return True return False except Exception as e: logger.error(f"Failed to check if task is active for job {job_uuid}: {str(e)}") return False def cleanup_stale_tasks(): """ 清理卡住的Celery任務 Returns: int: 清理的任務數量 """ try: from app.models.job import TranslationJob from datetime import datetime, timedelta # 找出超過30分鐘仍在處理中的任務 stale_threshold = datetime.utcnow() - timedelta(minutes=30) stale_jobs = TranslationJob.query.filter( TranslationJob.status == 'PROCESSING', TranslationJob.processing_started_at < stale_threshold ).all() cleanup_count = 0 for job in stale_jobs: if not is_task_active(job.job_uuid): # 任務不在Celery中活躍,標記為失敗 job.update_status('FAILED', error_message='任務處理超時,已自動取消') cleanup_count += 1 logger.info(f"Cleaned up stale job: {job.job_uuid}") return cleanup_count except Exception as e: logger.error(f"Failed to cleanup stale tasks: {str(e)}") return 0