Three proposals addressing the 2026-02-25 trace pipeline OOM crash (114K CIDs): 1. trace-events-memory-triage: fetchmany iterator (read_sql_df_slow_iter), admission control (50K CID limit for non-MSD), cache skip for large queries, early memory release with gc.collect() 2. trace-async-job-queue: RQ-based async jobs for queries >20K CIDs, separate worker process with isolated memory, frontend polling via useTraceProgress composable, systemd service + deploy scripts 3. trace-streaming-response: chunked Redis storage (TRACE_STREAM_BATCH_SIZE=5000), NDJSON stream endpoint (GET /api/trace/job/<id>/stream), frontend ReadableStream consumer for progressive rendering, backward-compatible with legacy single-key storage All three proposals archived. 1101 tests pass, frontend builds clean. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
201 lines
7.0 KiB
Python
201 lines
7.0 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""Unit tests for EventFetcher."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from unittest.mock import patch
|
|
|
|
from mes_dashboard.services.event_fetcher import EventFetcher
|
|
|
|
|
|
def _iter_result(columns, rows):
|
|
"""Helper: create a generator that yields a single (columns, rows) batch."""
|
|
def _gen(*args, **kwargs):
|
|
yield columns, rows
|
|
return _gen
|
|
|
|
|
|
def _iter_empty(*args, **kwargs):
|
|
"""Helper: generator that yields nothing (empty result)."""
|
|
return iter([])
|
|
|
|
|
|
def test_cache_key_is_stable_for_sorted_ids():
|
|
key1 = EventFetcher._cache_key("history", ["CID-B", "CID-A", "CID-A"])
|
|
key2 = EventFetcher._cache_key("history", ["CID-A", "CID-B"])
|
|
|
|
assert key1 == key2
|
|
assert key1.startswith("evt:history:")
|
|
|
|
|
|
def test_get_rate_limit_config_supports_env_override(monkeypatch):
|
|
monkeypatch.setenv("EVT_HISTORY_RATE_MAX_REQUESTS", "33")
|
|
monkeypatch.setenv("EVT_HISTORY_RATE_WINDOW_SECONDS", "77")
|
|
|
|
config = EventFetcher._get_rate_limit_config("history")
|
|
|
|
assert config["bucket"] == "event-history"
|
|
assert config["max_attempts"] == 33
|
|
assert config["window_seconds"] == 77
|
|
|
|
|
|
@patch("mes_dashboard.services.event_fetcher.read_sql_df_slow_iter")
|
|
@patch("mes_dashboard.services.event_fetcher.cache_get")
|
|
def test_fetch_events_cache_hit_skips_db(mock_cache_get, mock_iter):
|
|
mock_cache_get.return_value = {"CID-1": [{"CONTAINERID": "CID-1"}]}
|
|
|
|
result = EventFetcher.fetch_events(["CID-1"], "materials")
|
|
|
|
assert result["CID-1"][0]["CONTAINERID"] == "CID-1"
|
|
mock_iter.assert_not_called()
|
|
|
|
|
|
@patch("mes_dashboard.services.event_fetcher.cache_set")
|
|
@patch("mes_dashboard.services.event_fetcher.cache_get", return_value=None)
|
|
@patch("mes_dashboard.services.event_fetcher.read_sql_df_slow_iter")
|
|
@patch("mes_dashboard.services.event_fetcher.SQLLoader.load_with_params")
|
|
def test_fetch_events_upstream_history_branch(
|
|
mock_sql_load,
|
|
mock_iter,
|
|
_mock_cache_get,
|
|
mock_cache_set,
|
|
):
|
|
mock_sql_load.return_value = "SELECT * FROM UPSTREAM"
|
|
mock_iter.side_effect = _iter_result(
|
|
["CONTAINERID", "WORKCENTER_GROUP"],
|
|
[("CID-1", "DB"), ("CID-2", "WB")],
|
|
)
|
|
|
|
result = EventFetcher.fetch_events(["CID-1", "CID-2"], "upstream_history")
|
|
|
|
assert sorted(result.keys()) == ["CID-1", "CID-2"]
|
|
assert mock_sql_load.call_args.args[0] == "mid_section_defect/upstream_history"
|
|
sql_arg, params_arg = mock_iter.call_args.args
|
|
assert len(params_arg) == 2
|
|
mock_cache_set.assert_called_once()
|
|
assert mock_cache_set.call_args.args[0].startswith("evt:upstream_history:")
|
|
|
|
|
|
@patch("mes_dashboard.services.event_fetcher.cache_set")
|
|
@patch("mes_dashboard.services.event_fetcher.cache_get", return_value=None)
|
|
@patch("mes_dashboard.services.event_fetcher.read_sql_df_slow_iter")
|
|
@patch("mes_dashboard.services.event_fetcher.SQLLoader.load")
|
|
def test_fetch_events_history_branch_replaces_container_filter(
|
|
mock_sql_load,
|
|
mock_iter,
|
|
_mock_cache_get,
|
|
_mock_cache_set,
|
|
):
|
|
mock_sql_load.return_value = (
|
|
"SELECT * FROM t WHERE h.CONTAINERID = :container_id {{ WORKCENTER_FILTER }}"
|
|
)
|
|
mock_iter.side_effect = _iter_empty
|
|
|
|
EventFetcher.fetch_events(["CID-1"], "history")
|
|
|
|
sql_arg, params_arg = mock_iter.call_args.args
|
|
assert "h.CONTAINERID = :container_id" not in sql_arg
|
|
assert "{{ WORKCENTER_FILTER }}" not in sql_arg
|
|
assert params_arg == {"p0": "CID-1"}
|
|
|
|
|
|
@patch("mes_dashboard.services.event_fetcher.cache_set")
|
|
@patch("mes_dashboard.services.event_fetcher.cache_get", return_value=None)
|
|
@patch("mes_dashboard.services.event_fetcher.read_sql_df_slow_iter")
|
|
@patch("mes_dashboard.services.event_fetcher.SQLLoader.load")
|
|
def test_fetch_events_materials_branch_replaces_aliased_container_filter(
|
|
mock_sql_load,
|
|
mock_iter,
|
|
_mock_cache_get,
|
|
_mock_cache_set,
|
|
):
|
|
mock_sql_load.return_value = (
|
|
"SELECT * FROM t m WHERE m.CONTAINERID = :container_id ORDER BY TXNDATE"
|
|
)
|
|
mock_iter.side_effect = _iter_empty
|
|
|
|
EventFetcher.fetch_events(["CID-1", "CID-2"], "materials")
|
|
|
|
sql_arg, params_arg = mock_iter.call_args.args
|
|
assert "m.CONTAINERID = :container_id" not in sql_arg
|
|
assert "IN" in sql_arg.upper()
|
|
assert params_arg == {"p0": "CID-1", "p1": "CID-2"}
|
|
|
|
|
|
@patch("mes_dashboard.services.event_fetcher.cache_set")
|
|
@patch("mes_dashboard.services.event_fetcher.cache_get", return_value=None)
|
|
@patch("mes_dashboard.services.event_fetcher.read_sql_df_slow_iter")
|
|
@patch("mes_dashboard.services.event_fetcher.SQLLoader.load")
|
|
def test_fetch_events_rejects_branch_replaces_aliased_container_filter(
|
|
mock_sql_load,
|
|
mock_iter,
|
|
_mock_cache_get,
|
|
_mock_cache_set,
|
|
):
|
|
mock_sql_load.return_value = (
|
|
"SELECT * FROM t r LEFT JOIN c ON c.CONTAINERID = r.CONTAINERID "
|
|
"WHERE r.CONTAINERID = :container_id ORDER BY r.TXNDATE"
|
|
)
|
|
mock_iter.side_effect = _iter_empty
|
|
|
|
EventFetcher.fetch_events(["CID-1", "CID-2"], "rejects")
|
|
|
|
sql_arg, params_arg = mock_iter.call_args.args
|
|
assert "r.CONTAINERID = :container_id" not in sql_arg
|
|
assert "IN" in sql_arg.upper()
|
|
assert params_arg == {"p0": "CID-1", "p1": "CID-2"}
|
|
|
|
|
|
@patch("mes_dashboard.services.event_fetcher.cache_set")
|
|
@patch("mes_dashboard.services.event_fetcher.cache_get", return_value=None)
|
|
@patch("mes_dashboard.services.event_fetcher.read_sql_df_slow_iter")
|
|
@patch("mes_dashboard.services.event_fetcher.SQLLoader.load")
|
|
def test_fetch_events_holds_branch_replaces_aliased_container_filter(
|
|
mock_sql_load,
|
|
mock_iter,
|
|
_mock_cache_get,
|
|
_mock_cache_set,
|
|
):
|
|
mock_sql_load.return_value = (
|
|
"SELECT * FROM t h LEFT JOIN c ON c.CONTAINERID = h.CONTAINERID "
|
|
"WHERE h.CONTAINERID = :container_id ORDER BY h.HOLDTXNDATE DESC"
|
|
)
|
|
mock_iter.side_effect = _iter_empty
|
|
|
|
EventFetcher.fetch_events(["CID-1", "CID-2"], "holds")
|
|
|
|
sql_arg, params_arg = mock_iter.call_args.args
|
|
assert "h.CONTAINERID = :container_id" not in sql_arg
|
|
assert "IN" in sql_arg.upper()
|
|
assert params_arg == {"p0": "CID-1", "p1": "CID-2"}
|
|
|
|
|
|
def test_event_fetcher_uses_slow_iter_connection():
|
|
"""Regression: event_fetcher must use read_sql_df_slow_iter (non-pooled)."""
|
|
import mes_dashboard.services.event_fetcher as ef
|
|
from mes_dashboard.core.database import read_sql_df_slow_iter
|
|
|
|
assert ef.read_sql_df_slow_iter is read_sql_df_slow_iter
|
|
|
|
|
|
@patch("mes_dashboard.services.event_fetcher.cache_set")
|
|
@patch("mes_dashboard.services.event_fetcher.cache_get", return_value=None)
|
|
@patch("mes_dashboard.services.event_fetcher.read_sql_df_slow_iter")
|
|
@patch("mes_dashboard.services.event_fetcher.SQLLoader.load_with_params")
|
|
def test_fetch_events_sanitizes_nan_values(
|
|
mock_sql_load,
|
|
mock_iter,
|
|
_mock_cache_get,
|
|
_mock_cache_set,
|
|
):
|
|
"""NaN float values in records should be replaced with None."""
|
|
mock_sql_load.return_value = "SELECT * FROM UPSTREAM"
|
|
mock_iter.side_effect = _iter_result(
|
|
["CONTAINERID", "VALUE"],
|
|
[("CID-1", float("nan"))],
|
|
)
|
|
|
|
result = EventFetcher.fetch_events(["CID-1"], "upstream_history")
|
|
|
|
assert result["CID-1"][0]["VALUE"] is None
|