fix(hold): dedup equipment cache, fix portal iframe, improve Hold dashboards

- Equipment cache: add freshness gate so only 1 Oracle query per 5-min cycle
  across 4 gunicorn workers; sync worker waits before first refresh
- Portal: add frame-busting to prevent recursive iframe nesting
- Hold Overview: remove redundant TreeMap, add Product & Future Hold Comment
  columns to LotTable
- Hold History: switch list.sql JOIN from DW_MES_LOT_V (WIP snapshot) to
  DW_MES_CONTAINER (historical master) for reliable Product data; add
  Future Hold Comment column; fix comment truncation with hover tooltip
- Page status: reorganize drawer groupings

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
egg
2026-02-11 09:01:02 +08:00
parent be22571421
commit e2ce75b004
18 changed files with 420 additions and 237 deletions

View File

@@ -260,58 +260,49 @@ class TestMesApiStress:
@pytest.mark.stress
class TestPageNavigationStress:
"""Stress tests for rapid page navigation."""
def test_rapid_tab_switching(self, page: Page, app_server: str):
"""Test rapid tab switching in portal."""
page.goto(app_server, wait_until='domcontentloaded', timeout=30000)
page.wait_for_timeout(500)
# Only use released pages that are visible without admin login
tabs = [
'.tab:has-text("WIP 即時概況")',
'.tab:has-text("設備即時概況")',
'.tab:has-text("設備歷史績效")',
'.tab:has-text("設備維修查詢")',
]
start_time = time.time()
# Rapidly switch tabs 20 times
for i in range(20):
tab = tabs[i % len(tabs)]
page.locator(tab).click()
page.wait_for_timeout(50)
switch_time = time.time() - start_time
print(f"\n 20 tab switches in {switch_time:.3f}s")
# Page should still be responsive
expect(page.locator('h1')).to_contain_text('MES 報表入口')
print(" Portal remained stable")
def test_portal_iframe_stress(self, page: Page, app_server: str):
"""Test portal remains responsive with iframe loading."""
page.goto(app_server, wait_until='domcontentloaded', timeout=30000)
page.wait_for_timeout(500)
# Switch through released tabs (dev tabs hidden without admin login)
tabs = [
'WIP 即時概況',
'設備即時概況',
'設備歷史績效',
'設備維修查詢',
]
for tab_name in tabs:
page.locator(f'.tab:has-text("{tab_name}")').click()
page.wait_for_timeout(200)
# Verify tab is active
tab = page.locator(f'.tab:has-text("{tab_name}")')
expect(tab).to_have_class(re.compile(r'active'))
print(f"\n All {len(tabs)} tabs clickable and responsive")
"""Stress tests for rapid page navigation."""
def test_rapid_tab_switching(self, page: Page, app_server: str):
"""Test rapid tab switching in portal."""
page.goto(app_server, wait_until='domcontentloaded', timeout=30000)
sidebar_items = page.locator('.sidebar-item[data-target]')
expect(sidebar_items.first).to_be_visible()
item_count = sidebar_items.count()
assert item_count >= 1, "No portal sidebar pages available for navigation stress test"
start_time = time.time()
# Rapidly switch pages 20 times
for i in range(20):
item = sidebar_items.nth(i % item_count)
item.click()
page.wait_for_timeout(50)
switch_time = time.time() - start_time
print(f"\n 20 sidebar switches in {switch_time:.3f}s")
# Page should still be responsive
expect(page.locator('h1')).to_contain_text('MES 報表入口')
print(" Portal remained stable")
def test_portal_iframe_stress(self, page: Page, app_server: str):
"""Test portal remains responsive with iframe loading."""
page.goto(app_server, wait_until='domcontentloaded', timeout=30000)
sidebar_items = page.locator('.sidebar-item[data-target]')
expect(sidebar_items.first).to_be_visible()
item_count = sidebar_items.count()
assert item_count >= 1, "No portal sidebar pages available for iframe stress test"
checked = min(item_count, 4)
for idx in range(checked):
item = sidebar_items.nth(idx)
item.click()
page.wait_for_timeout(200)
# Verify clicked item is active
expect(item).to_have_class(re.compile(r'active'))
print(f"\n All {checked} sidebar pages clickable and responsive")
@pytest.mark.stress

View File

@@ -4,10 +4,11 @@
Tests aggregation, status classification, and cache query functionality.
"""
import pytest
from unittest.mock import patch, MagicMock
import json
import pandas as pd
import pytest
from unittest.mock import patch, MagicMock
import json
from datetime import datetime, timedelta
import pandas as pd
class TestClassifyStatus:
@@ -516,7 +517,7 @@ class TestGetEquipmentStatusCacheStatus:
assert result['count'] == 1000
class TestEquipmentProcessLevelCache:
class TestEquipmentProcessLevelCache:
"""Test bounded process-level cache behavior for equipment status."""
def test_lru_eviction_prefers_recent_keys(self):
@@ -535,11 +536,99 @@ class TestEquipmentProcessLevelCache:
def test_global_equipment_cache_uses_bounded_config(self):
import mes_dashboard.services.realtime_equipment_cache as eq
assert eq.EQUIPMENT_PROCESS_CACHE_MAX_SIZE >= 1
assert eq._equipment_status_cache.max_size == eq.EQUIPMENT_PROCESS_CACHE_MAX_SIZE
class TestSharedQueryFragments:
assert eq.EQUIPMENT_PROCESS_CACHE_MAX_SIZE >= 1
assert eq._equipment_status_cache.max_size == eq.EQUIPMENT_PROCESS_CACHE_MAX_SIZE
class TestEquipmentRefreshDedup:
"""Test refresh de-dup behavior and sync worker startup timing."""
def test_refresh_skips_when_recently_updated(self):
"""Should skip Oracle query when cache is fresh and force=False."""
import mes_dashboard.services.realtime_equipment_cache as eq
recent_updated = (datetime.now() - timedelta(seconds=10)).isoformat()
mock_client = MagicMock()
mock_client.get.return_value = recent_updated
with patch.object(eq, "_SYNC_INTERVAL", 300):
with patch.object(eq, "try_acquire_lock", return_value=True):
with patch.object(eq, "release_lock") as mock_release_lock:
with patch.object(eq, "get_redis_client", return_value=mock_client):
with patch.object(eq, "get_key_prefix", return_value="mes_wip"):
with patch.object(eq, "_load_equipment_status_from_oracle") as mock_oracle:
with patch.object(eq, "_save_to_redis", return_value=True) as mock_save:
result = eq.refresh_equipment_status_cache(force=False)
assert result is False
mock_oracle.assert_not_called()
mock_save.assert_not_called()
mock_client.get.assert_called_once_with("mes_wip:equipment_status:meta:updated")
mock_release_lock.assert_called_once_with("equipment_status_cache_update")
def test_refresh_proceeds_when_stale(self):
"""Should proceed with Oracle query when cache is stale."""
import mes_dashboard.services.realtime_equipment_cache as eq
stale_updated = (datetime.now() - timedelta(seconds=200)).isoformat()
mock_client = MagicMock()
mock_client.get.return_value = stale_updated
with patch.object(eq, "_SYNC_INTERVAL", 300):
with patch.object(eq, "try_acquire_lock", return_value=True):
with patch.object(eq, "release_lock"):
with patch.object(eq, "get_redis_client", return_value=mock_client):
with patch.object(eq, "get_key_prefix", return_value="mes_wip"):
with patch.object(eq, "_load_equipment_status_from_oracle", return_value=[{"RESOURCEID": "R001"}]) as mock_oracle:
with patch.object(eq, "_aggregate_by_resourceid", return_value=[{"RESOURCEID": "R001"}]):
with patch.object(eq, "_save_to_redis", return_value=True) as mock_save:
result = eq.refresh_equipment_status_cache(force=False)
assert result is True
mock_oracle.assert_called_once()
mock_save.assert_called_once()
def test_refresh_proceeds_when_force(self):
"""Should bypass freshness gate when force=True."""
import mes_dashboard.services.realtime_equipment_cache as eq
with patch.object(eq, "_SYNC_INTERVAL", 300):
with patch.object(eq, "try_acquire_lock", return_value=True):
with patch.object(eq, "release_lock"):
with patch.object(eq, "get_redis_client") as mock_get_redis_client:
with patch.object(eq, "_load_equipment_status_from_oracle", return_value=[{"RESOURCEID": "R001"}]) as mock_oracle:
with patch.object(eq, "_aggregate_by_resourceid", return_value=[{"RESOURCEID": "R001"}]):
with patch.object(eq, "_save_to_redis", return_value=True) as mock_save:
result = eq.refresh_equipment_status_cache(force=True)
assert result is True
mock_oracle.assert_called_once()
mock_save.assert_called_once()
mock_get_redis_client.assert_not_called()
def test_sync_worker_waits_before_first_refresh(self):
"""Sync worker should not refresh immediately on startup."""
import mes_dashboard.services.realtime_equipment_cache as eq
class StopImmediatelyEvent:
def __init__(self):
self.timeouts = []
def wait(self, timeout=None):
self.timeouts.append(timeout)
return True
fake_stop_event = StopImmediatelyEvent()
with patch.object(eq, "_STOP_EVENT", fake_stop_event):
with patch.object(eq, "refresh_equipment_status_cache") as mock_refresh:
eq._sync_worker(interval=300)
mock_refresh.assert_not_called()
assert fake_stop_event.timeouts == [300]
class TestSharedQueryFragments:
"""Test shared SQL fragment governance for equipment cache."""
def test_equipment_load_uses_shared_sql_fragment(self):