#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 啟動 Celery Worker 的輔助腳本 """ import sys import os # Fix encoding for Windows console if sys.stdout.encoding != 'utf-8': sys.stdout.reconfigure(encoding='utf-8') if sys.stderr.encoding != 'utf-8': sys.stderr.reconfigure(encoding='utf-8') # 將 app 目錄加入 sys.path sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'app')) def start_celery_worker(): """啟動 Celery Worker""" print("正在啟動 Celery Worker...") # 設置環境變數 os.environ.setdefault('FLASK_ENV', 'development') # 導入應用 from app import create_app app = create_app() print(f"Flask 應用已創建: {app}") print(f"Celery 實例: {app.celery}") # 啟動 Celery Worker # Windows 需要使用 --pool=solo 參數 print("正在啟動 Celery Worker(Windows 模式)...") # 使用 subprocess 啟動 celery worker import subprocess cmd = [ sys.executable, '-m', 'celery', '-A', 'app.celery', 'worker', '--loglevel=info', '--pool=solo' ] print(f"執行命令: {' '.join(cmd)}") try: # 切換到正確的目錄 os.chdir(os.path.dirname(__file__)) # 啟動進程 process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, encoding='utf-8', bufsize=1, universal_newlines=True ) print("Celery Worker 已啟動,PID:", process.pid) print("正在監控輸出...") # 即時顯示輸出 for line in iter(process.stdout.readline, ''): print(line.rstrip()) except KeyboardInterrupt: print("\n收到中斷信號,正在停止 Celery Worker...") if 'process' in locals(): process.terminate() sys.exit(0) except Exception as e: print(f"啟動 Celery Worker 時發生錯誤: {e}") import traceback traceback.print_exc() if __name__ == "__main__": start_celery_worker()