fix: 新增分佈式鎖防止多 Worker 同時更新快取

多個 Gunicorn workers 同時執行快取更新會導致資料庫連接競爭,
造成 F5 重新整理時前端卡住。

變更內容:
- redis_client.py: 新增 try_acquire_lock/release_lock 函數
- cache_updater.py: WIP 與 Resource 快取更新加入分佈式鎖
- realtime_equipment_cache.py: 設備狀態快取更新加入分佈式鎖

鎖機制採用 Redis SET NX EX 原子操作,fail-open 設計確保
Redis 故障時不會阻塞正常操作。

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
beabigegg
2026-02-03 18:50:02 +08:00
parent d5aa2fa749
commit 5517e5f767
3 changed files with 107 additions and 20 deletions

View File

@@ -17,7 +17,9 @@ from mes_dashboard.core.redis_client import (
get_redis_client,
get_key,
redis_available,
REDIS_ENABLED
REDIS_ENABLED,
try_acquire_lock,
release_lock,
)
from mes_dashboard.core.database import read_sql_df
@@ -116,6 +118,8 @@ class CacheUpdater:
def _check_and_update(self, force: bool = False) -> bool:
"""Check SYS_DATE and update cache if needed.
Uses distributed lock to prevent multiple workers from updating simultaneously.
Args:
force: If True, update regardless of SYS_DATE.
@@ -126,6 +130,11 @@ class CacheUpdater:
logger.warning("Redis not available, skipping cache update")
return False
# Try to acquire distributed lock (non-blocking)
if not try_acquire_lock("wip_cache_update", ttl_seconds=120):
logger.debug("Another worker is updating WIP cache, skipping")
return False
try:
# Get current SYS_DATE from Oracle
oracle_sys_date = self._check_sys_date()
@@ -157,6 +166,8 @@ class CacheUpdater:
except Exception as e:
logger.error(f"Error in cache update: {e}", exc_info=True)
return False
finally:
release_lock("wip_cache_update")
def _check_sys_date(self) -> Optional[str]:
"""Query Oracle for MAX(SYS_DATE).
@@ -247,6 +258,8 @@ class CacheUpdater:
def _check_resource_update(self, force: bool = False) -> bool:
"""Check and update resource cache if needed.
Uses distributed lock to prevent multiple workers from updating simultaneously.
Args:
force: If True, update regardless of interval.
@@ -271,6 +284,11 @@ class CacheUpdater:
)
return False
# Try to acquire distributed lock (non-blocking)
if not try_acquire_lock("resource_cache_update", ttl_seconds=300):
logger.debug("Another worker is updating resource cache, skipping")
return False
# Perform sync
logger.info("Checking resource cache for updates...")
try:
@@ -280,6 +298,8 @@ class CacheUpdater:
except Exception as e:
logger.error(f"Resource cache update failed: {e}", exc_info=True)
return False
finally:
release_lock("resource_cache_update")
# ============================================================

View File

@@ -116,3 +116,55 @@ def close_redis() -> None:
logger.warning(f"Error closing Redis connection: {e}")
finally:
_REDIS_CLIENT = None
def try_acquire_lock(lock_name: str, ttl_seconds: int = 60) -> bool:
"""Try to acquire a distributed lock using Redis SET NX.
This is a non-blocking lock acquisition. If the lock is already held,
returns False immediately without waiting.
Args:
lock_name: Name of the lock (will be prefixed with key prefix).
ttl_seconds: Lock expiration time in seconds (prevents deadlocks).
Returns:
True if lock was acquired, False if already held by another process.
"""
client = get_redis_client()
if client is None:
# Redis unavailable - allow operation to proceed (fail-open)
logger.warning(f"Redis unavailable, skipping lock for {lock_name}")
return True
try:
lock_key = f"{REDIS_KEY_PREFIX}:lock:{lock_name}"
# SET NX EX is atomic: only sets if key doesn't exist
acquired = client.set(lock_key, str(os.getpid()), nx=True, ex=ttl_seconds)
if acquired:
logger.debug(f"Acquired lock: {lock_name}")
else:
logger.debug(f"Lock already held: {lock_name}")
return bool(acquired)
except Exception as e:
logger.warning(f"Failed to acquire lock {lock_name}: {e}")
# Fail-open: allow operation if Redis has issues
return True
def release_lock(lock_name: str) -> None:
"""Release a distributed lock.
Args:
lock_name: Name of the lock to release.
"""
client = get_redis_client()
if client is None:
return
try:
lock_key = f"{REDIS_KEY_PREFIX}:lock:{lock_name}"
client.delete(lock_key)
logger.debug(f"Released lock: {lock_name}")
except Exception as e:
logger.warning(f"Failed to release lock {lock_name}: {e}")

View File

@@ -13,7 +13,12 @@ from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
from mes_dashboard.core.database import read_sql_df
from mes_dashboard.core.redis_client import get_redis_client, get_key_prefix
from mes_dashboard.core.redis_client import (
get_redis_client,
get_key_prefix,
try_acquire_lock,
release_lock,
)
from mes_dashboard.config.constants import (
EQUIPMENT_STATUS_DATA_KEY,
EQUIPMENT_STATUS_INDEX_KEY,
@@ -541,35 +546,45 @@ def get_equipment_status_cache_status() -> Dict[str, Any]:
def refresh_equipment_status_cache(force: bool = False) -> bool:
"""Refresh equipment status cache.
Uses distributed lock to prevent multiple workers from refreshing simultaneously.
Args:
force: If True, refresh immediately regardless of state.
Returns:
True if refresh succeeded, False otherwise.
"""
with _SYNC_LOCK:
logger.info("Refreshing equipment status cache...")
start_time = time.time()
# Try to acquire distributed lock (non-blocking)
if not try_acquire_lock("equipment_status_cache_update", ttl_seconds=120):
logger.debug("Another worker is refreshing equipment status cache, skipping")
return False
# Load from Oracle
records = _load_equipment_status_from_oracle()
if records is None:
logger.error("Failed to load equipment status from Oracle")
return False
try:
with _SYNC_LOCK:
logger.info("Refreshing equipment status cache...")
start_time = time.time()
# Aggregate
aggregated = _aggregate_by_resourceid(records)
# Load from Oracle
records = _load_equipment_status_from_oracle()
if records is None:
logger.error("Failed to load equipment status from Oracle")
return False
# Save to Redis
success = _save_to_redis(aggregated)
# Aggregate
aggregated = _aggregate_by_resourceid(records)
elapsed = time.time() - start_time
if success:
logger.info(f"Equipment status cache refreshed in {elapsed:.2f}s")
else:
logger.error(f"Equipment status cache refresh failed after {elapsed:.2f}s")
# Save to Redis
success = _save_to_redis(aggregated)
return success
elapsed = time.time() - start_time
if success:
logger.info(f"Equipment status cache refreshed in {elapsed:.2f}s")
else:
logger.error(f"Equipment status cache refresh failed after {elapsed:.2f}s")
return success
finally:
release_lock("equipment_status_cache_update")
def _sync_worker(interval: int):