#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ WebSocket 服務模組 Author: PANJIT IT Team Created: 2024-01-28 Modified: 2024-01-28 """ from flask_socketio import SocketIO, emit, join_room, leave_room, disconnect from flask_jwt_extended import decode_token, get_jwt from flask import request from functools import wraps import logging # 初始化 SocketIO socketio = SocketIO( cors_allowed_origins="*", async_mode='threading', logger=True, engineio_logger=False ) # 存儲用戶連接 connected_users = {} logger = logging.getLogger(__name__) def jwt_required_ws(f): """WebSocket JWT 驗證裝飾器""" @wraps(f) def decorated_function(*args, **kwargs): try: # 從查詢參數獲取 token token = request.args.get('token') if not token: disconnect() return False # 解碼 token decoded = decode_token(token) user_id = decoded.get('sub') # 儲存用戶信息 request.user_id = user_id return f(*args, **kwargs) except Exception as e: logger.error(f"WebSocket authentication failed: {e}") disconnect() return False return decorated_function @socketio.on('connect') def handle_connect(auth): """處理客戶端連接""" try: # 從認證數據獲取 token if auth and 'token' in auth: token = auth['token'] decoded = decode_token(token) user_id = decoded.get('sub') # 記錄連接 connected_users[request.sid] = { 'user_id': user_id, 'sid': request.sid } # 加入用戶專屬房間 join_room(f"user_{user_id}") logger.info(f"User {user_id} connected with session {request.sid}") # 發送連接成功消息 emit('connected', { 'message': '連接成功', 'user_id': user_id }) return True else: logger.warning("Connection attempt without authentication") disconnect() return False except Exception as e: logger.error(f"Connection error: {e}") disconnect() return False @socketio.on('disconnect') def handle_disconnect(): """處理客戶端斷開連接""" try: if request.sid in connected_users: user_info = connected_users[request.sid] user_id = user_info['user_id'] # 離開房間 leave_room(f"user_{user_id}") # 移除連接記錄 del connected_users[request.sid] logger.info(f"User {user_id} disconnected") except Exception as e: logger.error(f"Disconnect error: {e}") @socketio.on('ping') def handle_ping(): """處理心跳包""" emit('pong', {'timestamp': request.args.get('timestamp')}) @socketio.on('subscribe_job') def handle_subscribe_job(data): """訂閱任務更新""" try: job_uuid = data.get('job_uuid') if job_uuid: join_room(f"job_{job_uuid}") logger.info(f"Client {request.sid} subscribed to job {job_uuid}") emit('subscribed', {'job_uuid': job_uuid}) except Exception as e: logger.error(f"Subscribe job error: {e}") @socketio.on('unsubscribe_job') def handle_unsubscribe_job(data): """取消訂閱任務更新""" try: job_uuid = data.get('job_uuid') if job_uuid: leave_room(f"job_{job_uuid}") logger.info(f"Client {request.sid} unsubscribed from job {job_uuid}") emit('unsubscribed', {'job_uuid': job_uuid}) except Exception as e: logger.error(f"Unsubscribe job error: {e}") # 工具函數:發送通知 def send_notification_to_user(user_id, notification_data): """ 向特定用戶發送通知 Args: user_id: 用戶ID notification_data: 通知數據 """ try: socketio.emit( 'new_notification', notification_data, room=f"user_{user_id}", namespace='/' ) logger.info(f"Notification sent to user {user_id}") except Exception as e: logger.error(f"Failed to send notification: {e}") def send_job_update(job_uuid, update_data): """ 發送任務更新 Args: job_uuid: 任務UUID update_data: 更新數據 """ try: socketio.emit( 'job_update', { 'job_uuid': job_uuid, **update_data }, room=f"job_{job_uuid}", namespace='/' ) logger.info(f"Job update sent for {job_uuid}") except Exception as e: logger.error(f"Failed to send job update: {e}") def broadcast_system_message(message, message_type='info'): """ 廣播系統消息給所有連接的用戶 Args: message: 消息內容 message_type: 消息類型 """ try: socketio.emit( 'system_message', { 'message': message, 'type': message_type }, namespace='/', broadcast=True ) logger.info(f"System message broadcasted: {message}") except Exception as e: logger.error(f"Failed to broadcast system message: {e}") # 初始化函數 def init_websocket(app): """ 初始化 WebSocket Args: app: Flask 應用實例 """ socketio.init_app(app) logger.info("WebSocket initialized") return socketio