perf: 為設備快取加入 Process-Level Cache 防止 GIL 競爭
- 為 resource_cache 和 realtime_equipment_cache 加入 30 秒 TTL 的進程級快取 - 使用 Double-Check Locking 確保只有一個執行緒解析 JSON - 背景同步更新 Redis 時主動清除進程級快取 - 修復測試以清除進程級快取避免測試干擾 解決 F5 刷新設備即時概況頁面時因併發 JSON 解析導致的卡頓問題 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -10,7 +10,7 @@ import logging
|
||||
import threading
|
||||
import time
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict, List, Optional
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
from mes_dashboard.core.database import read_sql_df
|
||||
from mes_dashboard.core.redis_client import get_redis_client, get_key_prefix
|
||||
@@ -24,6 +24,44 @@ from mes_dashboard.config.constants import (
|
||||
|
||||
logger = logging.getLogger('mes_dashboard.realtime_equipment_cache')
|
||||
|
||||
# ============================================================
|
||||
# Process-Level Cache (Prevents redundant JSON parsing)
|
||||
# ============================================================
|
||||
|
||||
class _ProcessLevelCache:
|
||||
"""Thread-safe process-level cache for parsed equipment status data."""
|
||||
|
||||
def __init__(self, ttl_seconds: int = 30):
|
||||
self._cache: Dict[str, Tuple[List[Dict[str, Any]], float]] = {}
|
||||
self._lock = threading.Lock()
|
||||
self._ttl = ttl_seconds
|
||||
|
||||
def get(self, key: str) -> Optional[List[Dict[str, Any]]]:
|
||||
"""Get cached data if not expired."""
|
||||
with self._lock:
|
||||
if key not in self._cache:
|
||||
return None
|
||||
data, timestamp = self._cache[key]
|
||||
if time.time() - timestamp > self._ttl:
|
||||
del self._cache[key]
|
||||
return None
|
||||
return data
|
||||
|
||||
def set(self, key: str, data: List[Dict[str, Any]]) -> None:
|
||||
"""Cache data with current timestamp."""
|
||||
with self._lock:
|
||||
self._cache[key] = (data, time.time())
|
||||
|
||||
def invalidate(self, key: str) -> None:
|
||||
"""Remove a key from cache."""
|
||||
with self._lock:
|
||||
self._cache.pop(key, None)
|
||||
|
||||
|
||||
# Global process-level cache for equipment status (30s TTL)
|
||||
_equipment_status_cache = _ProcessLevelCache(ttl_seconds=30)
|
||||
_equipment_status_parse_lock = threading.Lock()
|
||||
|
||||
# ============================================================
|
||||
# Module State
|
||||
# ============================================================
|
||||
@@ -282,6 +320,9 @@ def _save_to_redis(aggregated: List[Dict[str, Any]]) -> bool:
|
||||
pipe.set(count_key, str(count))
|
||||
pipe.execute()
|
||||
|
||||
# Invalidate process-level cache so next request picks up new data
|
||||
_equipment_status_cache.invalidate("equipment_status_all")
|
||||
|
||||
logger.info(f"Saved {count} equipment status records to Redis")
|
||||
return True
|
||||
|
||||
@@ -295,31 +336,62 @@ def _save_to_redis(aggregated: List[Dict[str, Any]]) -> bool:
|
||||
# ============================================================
|
||||
|
||||
def get_all_equipment_status() -> List[Dict[str, Any]]:
|
||||
"""Get all equipment status from cache.
|
||||
"""Get all equipment status from cache with process-level caching.
|
||||
|
||||
Uses a two-tier cache strategy:
|
||||
1. Process-level cache: Parsed data (30s TTL) - fast, no parsing
|
||||
2. Redis cache: Raw JSON data - shared across workers
|
||||
|
||||
This prevents redundant JSON parsing across concurrent requests.
|
||||
|
||||
Returns:
|
||||
List of equipment status records, or empty list if unavailable.
|
||||
"""
|
||||
cache_key = "equipment_status_all"
|
||||
|
||||
# Tier 1: Check process-level cache first (fast path)
|
||||
cached_data = _equipment_status_cache.get(cache_key)
|
||||
if cached_data is not None:
|
||||
logger.debug(f"Process cache hit: {len(cached_data)} records")
|
||||
return cached_data
|
||||
|
||||
# Tier 2: Parse from Redis (slow path - needs lock)
|
||||
redis_client = get_redis_client()
|
||||
if not redis_client:
|
||||
logger.warning("Redis client not available for equipment status query")
|
||||
return []
|
||||
|
||||
try:
|
||||
prefix = get_key_prefix()
|
||||
data_key = f"{prefix}:{EQUIPMENT_STATUS_DATA_KEY}"
|
||||
# Use lock to prevent multiple threads from parsing simultaneously
|
||||
with _equipment_status_parse_lock:
|
||||
# Double-check after acquiring lock
|
||||
cached_data = _equipment_status_cache.get(cache_key)
|
||||
if cached_data is not None:
|
||||
logger.debug(f"Process cache hit (after lock): {len(cached_data)} records")
|
||||
return cached_data
|
||||
|
||||
data_json = redis_client.get(data_key)
|
||||
if not data_json:
|
||||
logger.debug("No equipment status data in cache")
|
||||
try:
|
||||
start_time = time.time()
|
||||
prefix = get_key_prefix()
|
||||
data_key = f"{prefix}:{EQUIPMENT_STATUS_DATA_KEY}"
|
||||
|
||||
data_json = redis_client.get(data_key)
|
||||
if not data_json:
|
||||
logger.debug("No equipment status data in cache")
|
||||
return []
|
||||
|
||||
data = json.loads(data_json)
|
||||
parse_time = time.time() - start_time
|
||||
|
||||
# Store in process-level cache
|
||||
_equipment_status_cache.set(cache_key, data)
|
||||
|
||||
logger.debug(f"Equipment status cache hit: {len(data)} records (parsed in {parse_time:.2f}s)")
|
||||
return data
|
||||
|
||||
except Exception as exc:
|
||||
logger.error(f"Failed to get equipment status from cache: {exc}")
|
||||
return []
|
||||
|
||||
return json.loads(data_json)
|
||||
|
||||
except Exception as exc:
|
||||
logger.error(f"Failed to get equipment status from cache: {exc}")
|
||||
return []
|
||||
|
||||
|
||||
def get_equipment_status_by_id(resource_id: str) -> Optional[Dict[str, Any]]:
|
||||
"""Get equipment status by RESOURCEID.
|
||||
|
||||
@@ -11,8 +11,10 @@ import io
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict, List, Optional
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
import pandas as pd
|
||||
|
||||
@@ -32,6 +34,44 @@ from mes_dashboard.sql import QueryBuilder
|
||||
|
||||
logger = logging.getLogger('mes_dashboard.resource_cache')
|
||||
|
||||
# ============================================================
|
||||
# Process-Level Cache (Prevents redundant JSON parsing)
|
||||
# ============================================================
|
||||
|
||||
class _ProcessLevelCache:
|
||||
"""Thread-safe process-level cache for parsed DataFrames."""
|
||||
|
||||
def __init__(self, ttl_seconds: int = 30):
|
||||
self._cache: Dict[str, Tuple[pd.DataFrame, float]] = {}
|
||||
self._lock = threading.Lock()
|
||||
self._ttl = ttl_seconds
|
||||
|
||||
def get(self, key: str) -> Optional[pd.DataFrame]:
|
||||
"""Get cached DataFrame if not expired."""
|
||||
with self._lock:
|
||||
if key not in self._cache:
|
||||
return None
|
||||
df, timestamp = self._cache[key]
|
||||
if time.time() - timestamp > self._ttl:
|
||||
del self._cache[key]
|
||||
return None
|
||||
return df
|
||||
|
||||
def set(self, key: str, df: pd.DataFrame) -> None:
|
||||
"""Cache a DataFrame with current timestamp."""
|
||||
with self._lock:
|
||||
self._cache[key] = (df, time.time())
|
||||
|
||||
def invalidate(self, key: str) -> None:
|
||||
"""Remove a key from cache."""
|
||||
with self._lock:
|
||||
self._cache.pop(key, None)
|
||||
|
||||
|
||||
# Global process-level cache for resource data (30s TTL)
|
||||
_resource_df_cache = _ProcessLevelCache(ttl_seconds=30)
|
||||
_resource_parse_lock = threading.Lock()
|
||||
|
||||
# ============================================================
|
||||
# Configuration
|
||||
# ============================================================
|
||||
@@ -179,6 +219,9 @@ def _sync_to_redis(df: pd.DataFrame, version: str) -> bool:
|
||||
pipe.set(_get_key("meta:count"), str(len(df)))
|
||||
pipe.execute()
|
||||
|
||||
# Invalidate process-level cache so next request picks up new data
|
||||
_resource_df_cache.invalidate("resource_data")
|
||||
|
||||
logger.info(f"Resource cache synced: {len(df)} rows, version={version}")
|
||||
return True
|
||||
except Exception as e:
|
||||
@@ -187,11 +230,26 @@ def _sync_to_redis(df: pd.DataFrame, version: str) -> bool:
|
||||
|
||||
|
||||
def _get_cached_data() -> Optional[pd.DataFrame]:
|
||||
"""Get cached resource data from Redis.
|
||||
"""Get cached resource data from Redis with process-level caching.
|
||||
|
||||
Uses a two-tier cache strategy:
|
||||
1. Process-level cache: Parsed DataFrame (30s TTL) - fast, no parsing
|
||||
2. Redis cache: Raw JSON data - shared across workers
|
||||
|
||||
This prevents redundant JSON parsing across concurrent requests.
|
||||
|
||||
Returns:
|
||||
DataFrame with resource data, or None if cache miss.
|
||||
"""
|
||||
cache_key = "resource_data"
|
||||
|
||||
# Tier 1: Check process-level cache first (fast path)
|
||||
cached_df = _resource_df_cache.get(cache_key)
|
||||
if cached_df is not None:
|
||||
logger.debug(f"Process cache hit: {len(cached_df)} rows")
|
||||
return cached_df
|
||||
|
||||
# Tier 2: Parse from Redis (slow path - needs lock)
|
||||
if not REDIS_ENABLED or not RESOURCE_CACHE_ENABLED:
|
||||
return None
|
||||
|
||||
@@ -199,18 +257,32 @@ def _get_cached_data() -> Optional[pd.DataFrame]:
|
||||
if client is None:
|
||||
return None
|
||||
|
||||
try:
|
||||
data_json = client.get(_get_key("data"))
|
||||
if data_json is None:
|
||||
logger.debug("Resource cache miss: no data in Redis")
|
||||
return None
|
||||
# Use lock to prevent multiple threads from parsing simultaneously
|
||||
with _resource_parse_lock:
|
||||
# Double-check after acquiring lock
|
||||
cached_df = _resource_df_cache.get(cache_key)
|
||||
if cached_df is not None:
|
||||
logger.debug(f"Process cache hit (after lock): {len(cached_df)} rows")
|
||||
return cached_df
|
||||
|
||||
df = pd.read_json(io.StringIO(data_json), orient='records')
|
||||
logger.debug(f"Resource cache hit: loaded {len(df)} rows from Redis")
|
||||
return df
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to read resource cache: {e}")
|
||||
return None
|
||||
try:
|
||||
start_time = time.time()
|
||||
data_json = client.get(_get_key("data"))
|
||||
if data_json is None:
|
||||
logger.debug("Resource cache miss: no data in Redis")
|
||||
return None
|
||||
|
||||
df = pd.read_json(io.StringIO(data_json), orient='records')
|
||||
parse_time = time.time() - start_time
|
||||
|
||||
# Store in process-level cache
|
||||
_resource_df_cache.set(cache_key, df)
|
||||
|
||||
logger.debug(f"Resource cache hit: loaded {len(df)} rows from Redis (parsed in {parse_time:.2f}s)")
|
||||
return df
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to read resource cache: {e}")
|
||||
return None
|
||||
|
||||
|
||||
# ============================================================
|
||||
|
||||
@@ -15,11 +15,15 @@ class TestGetCachedWipData:
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def reset_redis(self):
|
||||
"""Reset Redis client state."""
|
||||
"""Reset Redis client state and process-level cache."""
|
||||
import mes_dashboard.core.redis_client as rc
|
||||
import mes_dashboard.core.cache as cache
|
||||
rc._REDIS_CLIENT = None
|
||||
# Clear process-level cache to avoid test interference
|
||||
cache._wip_df_cache.clear()
|
||||
yield
|
||||
rc._REDIS_CLIENT = None
|
||||
cache._wip_df_cache.clear()
|
||||
|
||||
def test_returns_none_when_redis_disabled(self):
|
||||
"""Test returns None when Redis is disabled."""
|
||||
|
||||
Reference in New Issue
Block a user