Three proposals addressing the 2026-02-25 trace pipeline OOM crash (114K CIDs): 1. trace-events-memory-triage: fetchmany iterator (read_sql_df_slow_iter), admission control (50K CID limit for non-MSD), cache skip for large queries, early memory release with gc.collect() 2. trace-async-job-queue: RQ-based async jobs for queries >20K CIDs, separate worker process with isolated memory, frontend polling via useTraceProgress composable, systemd service + deploy scripts 3. trace-streaming-response: chunked Redis storage (TRACE_STREAM_BATCH_SIZE=5000), NDJSON stream endpoint (GET /api/trace/job/<id>/stream), frontend ReadableStream consumer for progressive rendering, backward-compatible with legacy single-key storage All three proposals archived. 1101 tests pass, frontend builds clean. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
508 lines
17 KiB
Python
508 lines
17 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""Unit tests for trace_job_service (async trace job queue)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
import mes_dashboard.services.trace_job_service as tjs
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# is_async_available
|
|
# ---------------------------------------------------------------------------
|
|
def test_is_async_available_true():
|
|
"""Should return True when rq is importable and Redis is up."""
|
|
tjs._RQ_AVAILABLE = None # reset cached flag
|
|
with patch.object(tjs, "get_redis_client", return_value=MagicMock()):
|
|
assert tjs.is_async_available() is True
|
|
|
|
|
|
def test_is_async_available_false_no_redis():
|
|
"""Should return False when Redis is unavailable."""
|
|
tjs._RQ_AVAILABLE = True
|
|
with patch.object(tjs, "get_redis_client", return_value=None):
|
|
assert tjs.is_async_available() is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# enqueue_trace_events_job
|
|
# ---------------------------------------------------------------------------
|
|
@patch.object(tjs, "_get_rq_queue")
|
|
@patch.object(tjs, "get_redis_client")
|
|
def test_enqueue_success(mock_redis, mock_queue_fn):
|
|
"""Enqueue should return a job_id and store metadata in Redis."""
|
|
conn = MagicMock()
|
|
mock_redis.return_value = conn
|
|
|
|
queue = MagicMock()
|
|
mock_queue_fn.return_value = queue
|
|
|
|
job_id, err = tjs.enqueue_trace_events_job(
|
|
"query_tool", ["CID-1", "CID-2"], ["history"], {"params": {}},
|
|
)
|
|
|
|
assert job_id is not None
|
|
assert job_id.startswith("trace-evt-")
|
|
assert err is None
|
|
queue.enqueue.assert_called_once()
|
|
conn.hset.assert_called_once()
|
|
conn.expire.assert_called_once()
|
|
|
|
|
|
@patch.object(tjs, "_get_rq_queue", return_value=None)
|
|
def test_enqueue_no_queue(mock_queue_fn):
|
|
"""Enqueue should return error when queue is unavailable."""
|
|
job_id, err = tjs.enqueue_trace_events_job(
|
|
"query_tool", ["CID-1"], ["history"], {},
|
|
)
|
|
|
|
assert job_id is None
|
|
assert "unavailable" in err
|
|
|
|
|
|
@patch.object(tjs, "_get_rq_queue")
|
|
@patch.object(tjs, "get_redis_client")
|
|
def test_enqueue_queue_error(mock_redis, mock_queue_fn):
|
|
"""Enqueue should return error when queue.enqueue raises."""
|
|
conn = MagicMock()
|
|
mock_redis.return_value = conn
|
|
|
|
queue = MagicMock()
|
|
queue.enqueue.side_effect = RuntimeError("connection refused")
|
|
mock_queue_fn.return_value = queue
|
|
|
|
job_id, err = tjs.enqueue_trace_events_job(
|
|
"query_tool", ["CID-1"], ["history"], {},
|
|
)
|
|
|
|
assert job_id is None
|
|
assert "connection refused" in err
|
|
# Meta key should be cleaned up
|
|
conn.delete.assert_called_once()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# get_job_status
|
|
# ---------------------------------------------------------------------------
|
|
@patch.object(tjs, "get_redis_client")
|
|
def test_get_job_status_found(mock_redis):
|
|
"""Should return status dict from Redis hash."""
|
|
conn = MagicMock()
|
|
conn.hgetall.return_value = {
|
|
"profile": "query_tool",
|
|
"cid_count": "100",
|
|
"domains": "history,materials",
|
|
"status": "started",
|
|
"progress": "fetching events",
|
|
"created_at": "1740000000.0",
|
|
"completed_at": "",
|
|
"error": "",
|
|
}
|
|
mock_redis.return_value = conn
|
|
|
|
status = tjs.get_job_status("trace-evt-abc123")
|
|
|
|
assert status is not None
|
|
assert status["job_id"] == "trace-evt-abc123"
|
|
assert status["status"] == "started"
|
|
assert status["cid_count"] == 100
|
|
assert status["domains"] == ["history", "materials"]
|
|
assert status["error"] is None
|
|
|
|
|
|
@patch.object(tjs, "get_redis_client")
|
|
def test_get_job_status_not_found(mock_redis):
|
|
"""Should return None when job metadata does not exist."""
|
|
conn = MagicMock()
|
|
conn.hgetall.return_value = {}
|
|
mock_redis.return_value = conn
|
|
|
|
assert tjs.get_job_status("trace-evt-nonexistent") is None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# get_job_result
|
|
# ---------------------------------------------------------------------------
|
|
@patch.object(tjs, "get_redis_client")
|
|
def test_get_job_result_found_chunked(mock_redis):
|
|
"""Should return reconstructed result from chunked Redis keys."""
|
|
result_meta = {
|
|
"profile": "query_tool",
|
|
"domains": {"history": {"chunks": 1, "total": 1}},
|
|
"failed_domains": [],
|
|
}
|
|
chunk_data = [{"CONTAINERID": "CID-1"}]
|
|
|
|
conn = MagicMock()
|
|
|
|
def _get_side_effect(key):
|
|
if key.endswith(":result:meta"):
|
|
return json.dumps(result_meta)
|
|
if key.endswith(":result:history:0"):
|
|
return json.dumps(chunk_data)
|
|
if key.endswith(":result:aggregation"):
|
|
return None
|
|
return None
|
|
|
|
conn.get.side_effect = _get_side_effect
|
|
mock_redis.return_value = conn
|
|
|
|
result = tjs.get_job_result("trace-evt-abc123")
|
|
|
|
assert result is not None
|
|
assert result["stage"] == "events"
|
|
assert result["results"]["history"]["count"] == 1
|
|
assert result["results"]["history"]["total"] == 1
|
|
|
|
|
|
@patch.object(tjs, "get_redis_client")
|
|
def test_get_job_result_found_legacy(mock_redis):
|
|
"""Should fall back to legacy single-key result when no chunked meta exists."""
|
|
result_data = {
|
|
"stage": "events",
|
|
"results": {"history": {"data": [{"CONTAINERID": "CID-1"}], "count": 1}},
|
|
"aggregation": None,
|
|
}
|
|
conn = MagicMock()
|
|
|
|
def _get_side_effect(key):
|
|
if key.endswith(":result:meta"):
|
|
return None # no chunked meta
|
|
if key.endswith(":result"):
|
|
return json.dumps(result_data)
|
|
return None
|
|
|
|
conn.get.side_effect = _get_side_effect
|
|
mock_redis.return_value = conn
|
|
|
|
result = tjs.get_job_result("trace-evt-abc123")
|
|
|
|
assert result is not None
|
|
assert result["stage"] == "events"
|
|
assert result["results"]["history"]["count"] == 1
|
|
|
|
|
|
@patch.object(tjs, "get_redis_client")
|
|
def test_get_job_result_not_found(mock_redis):
|
|
"""Should return None when result key does not exist."""
|
|
conn = MagicMock()
|
|
conn.get.return_value = None
|
|
mock_redis.return_value = conn
|
|
|
|
assert tjs.get_job_result("trace-evt-expired") is None
|
|
|
|
|
|
@patch.object(tjs, "get_redis_client")
|
|
def test_get_job_result_with_domain_filter(mock_redis):
|
|
"""Should return filtered result with pagination from chunked storage."""
|
|
result_meta = {
|
|
"profile": "query_tool",
|
|
"domains": {
|
|
"history": {"chunks": 1, "total": 3},
|
|
"materials": {"chunks": 1, "total": 1},
|
|
},
|
|
"failed_domains": [],
|
|
}
|
|
history_chunk = [{"id": 1}, {"id": 2}, {"id": 3}]
|
|
materials_chunk = [{"id": 10}]
|
|
|
|
conn = MagicMock()
|
|
|
|
def _get_side_effect(key):
|
|
if key.endswith(":result:meta"):
|
|
return json.dumps(result_meta)
|
|
if key.endswith(":result:history:0"):
|
|
return json.dumps(history_chunk)
|
|
if key.endswith(":result:materials:0"):
|
|
return json.dumps(materials_chunk)
|
|
if key.endswith(":result:aggregation"):
|
|
return None
|
|
return None
|
|
|
|
conn.get.side_effect = _get_side_effect
|
|
mock_redis.return_value = conn
|
|
|
|
result = tjs.get_job_result("trace-evt-abc", domain="history", offset=1, limit=1)
|
|
|
|
assert "history" in result["results"]
|
|
assert "materials" not in result["results"]
|
|
assert result["results"]["history"]["data"] == [{"id": 2}]
|
|
assert result["results"]["history"]["total"] == 3
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# execute_trace_events_job (worker entry point)
|
|
# ---------------------------------------------------------------------------
|
|
@patch.object(tjs, "get_redis_client")
|
|
@patch("mes_dashboard.services.event_fetcher.EventFetcher.fetch_events")
|
|
def test_execute_job_success(mock_fetch, mock_redis):
|
|
"""Worker should fetch events, store result, and update meta to finished."""
|
|
mock_fetch.return_value = {"CID-1": [{"CONTAINERID": "CID-1"}]}
|
|
|
|
conn = MagicMock()
|
|
mock_redis.return_value = conn
|
|
|
|
tjs.execute_trace_events_job(
|
|
"test-job-1", "query_tool", ["CID-1"], ["history"], {},
|
|
)
|
|
|
|
mock_fetch.assert_called_once_with(["CID-1"], "history")
|
|
|
|
# Result should be stored as chunked keys (chunk + result meta)
|
|
setex_calls = [c for c in conn.method_calls if c[0] == "setex"]
|
|
assert len(setex_calls) == 2 # 1 chunk + 1 result meta
|
|
|
|
# Find the result meta setex call
|
|
result_meta_call = [c for c in setex_calls if ":result:meta" in str(c)]
|
|
assert len(result_meta_call) == 1
|
|
stored_meta = json.loads(result_meta_call[0][1][2])
|
|
assert "history" in stored_meta["domains"]
|
|
assert stored_meta["domains"]["history"]["total"] == 1
|
|
|
|
# Find the chunk setex call
|
|
chunk_call = [c for c in setex_calls if ":result:history:0" in str(c)]
|
|
assert len(chunk_call) == 1
|
|
stored_chunk = json.loads(chunk_call[0][1][2])
|
|
assert len(stored_chunk) == 1
|
|
assert stored_chunk[0]["CONTAINERID"] == "CID-1"
|
|
|
|
# Job meta should be updated to finished
|
|
hset_calls = [c for c in conn.method_calls if c[0] == "hset"]
|
|
last_meta = hset_calls[-1][2]["mapping"]
|
|
assert last_meta["status"] == "finished"
|
|
|
|
|
|
@patch.object(tjs, "get_redis_client")
|
|
@patch("mes_dashboard.services.event_fetcher.EventFetcher.fetch_events")
|
|
def test_execute_job_domain_failure_records_partial(mock_fetch, mock_redis):
|
|
"""Domain fetch failure should result in partial failure, not job crash."""
|
|
mock_fetch.side_effect = RuntimeError("db timeout")
|
|
|
|
conn = MagicMock()
|
|
mock_redis.return_value = conn
|
|
|
|
tjs.execute_trace_events_job(
|
|
"test-job-2", "query_tool", ["CID-1"], ["history"], {},
|
|
)
|
|
|
|
# Result meta should still be stored (with failed_domains)
|
|
setex_calls = [c for c in conn.method_calls if c[0] == "setex"]
|
|
assert len(setex_calls) == 1 # only result meta (no chunks since domain failed)
|
|
stored_meta = json.loads(setex_calls[0][1][2])
|
|
assert "history" in stored_meta["failed_domains"]
|
|
|
|
# Job meta should still be finished (partial failure is not a job crash)
|
|
hset_calls = [c for c in conn.method_calls if c[0] == "hset"]
|
|
last_meta = hset_calls[-1][2]["mapping"]
|
|
assert last_meta["status"] == "finished"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _store_chunked_result
|
|
# ---------------------------------------------------------------------------
|
|
@patch.object(tjs, "get_redis_client")
|
|
def test_store_chunked_result_splits_batches(mock_redis):
|
|
"""Large domain data should be split into multiple chunks."""
|
|
conn = MagicMock()
|
|
mock_redis.return_value = conn
|
|
|
|
# 12 records, batch size 5 → 3 chunks (5+5+2)
|
|
rows = [{"id": i} for i in range(12)]
|
|
results = {"history": {"data": rows, "count": 12}}
|
|
|
|
original_batch_size = tjs.TRACE_STREAM_BATCH_SIZE
|
|
tjs.TRACE_STREAM_BATCH_SIZE = 5
|
|
try:
|
|
tjs._store_chunked_result(conn, "job-1", "query_tool", results, None, [])
|
|
finally:
|
|
tjs.TRACE_STREAM_BATCH_SIZE = original_batch_size
|
|
|
|
setex_calls = [c for c in conn.method_calls if c[0] == "setex"]
|
|
# 3 chunk keys + 1 result meta = 4
|
|
assert len(setex_calls) == 4
|
|
|
|
# Verify result meta
|
|
meta_call = [c for c in setex_calls if ":result:meta" in str(c)]
|
|
assert len(meta_call) == 1
|
|
meta = json.loads(meta_call[0][1][2])
|
|
assert meta["domains"]["history"]["chunks"] == 3
|
|
assert meta["domains"]["history"]["total"] == 12
|
|
|
|
# Verify chunks
|
|
chunk_calls = [c for c in setex_calls if ":result:history:" in str(c)]
|
|
assert len(chunk_calls) == 3
|
|
chunk_0 = json.loads(chunk_calls[0][1][2])
|
|
assert len(chunk_0) == 5
|
|
|
|
|
|
@patch.object(tjs, "get_redis_client")
|
|
def test_store_chunked_result_with_aggregation(mock_redis):
|
|
"""Aggregation should be stored in a separate key."""
|
|
conn = MagicMock()
|
|
mock_redis.return_value = conn
|
|
|
|
results = {"history": {"data": [{"id": 1}], "count": 1}}
|
|
aggregation = {"summary": {"total": 100}}
|
|
|
|
tjs._store_chunked_result(conn, "job-1", "mid_section_defect", results, aggregation, [])
|
|
|
|
setex_calls = [c for c in conn.method_calls if c[0] == "setex"]
|
|
agg_call = [c for c in setex_calls if ":result:aggregation" in str(c)]
|
|
assert len(agg_call) == 1
|
|
stored_agg = json.loads(agg_call[0][1][2])
|
|
assert stored_agg["summary"]["total"] == 100
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# stream_job_result_ndjson
|
|
# ---------------------------------------------------------------------------
|
|
@patch.object(tjs, "get_redis_client")
|
|
def test_stream_ndjson_chunked(mock_redis):
|
|
"""NDJSON stream should yield correct protocol lines for chunked result."""
|
|
result_meta = {
|
|
"profile": "query_tool",
|
|
"domains": {"history": {"chunks": 2, "total": 3}},
|
|
"failed_domains": [],
|
|
}
|
|
chunk_0 = [{"id": 1}, {"id": 2}]
|
|
chunk_1 = [{"id": 3}]
|
|
|
|
conn = MagicMock()
|
|
|
|
def _get_side_effect(key):
|
|
if key.endswith(":result:meta"):
|
|
return json.dumps(result_meta)
|
|
if key.endswith(":result:history:0"):
|
|
return json.dumps(chunk_0)
|
|
if key.endswith(":result:history:1"):
|
|
return json.dumps(chunk_1)
|
|
if key.endswith(":result:aggregation"):
|
|
return None
|
|
return None
|
|
|
|
conn.get.side_effect = _get_side_effect
|
|
mock_redis.return_value = conn
|
|
|
|
lines = list(tjs.stream_job_result_ndjson("job-1"))
|
|
parsed = [json.loads(line) for line in lines]
|
|
|
|
types = [p["type"] for p in parsed]
|
|
assert types == ["meta", "domain_start", "records", "records", "domain_end", "complete"]
|
|
|
|
assert parsed[0]["domains"] == ["history"]
|
|
assert parsed[1]["domain"] == "history"
|
|
assert parsed[1]["total"] == 3
|
|
assert parsed[2]["count"] == 2
|
|
assert parsed[3]["count"] == 1
|
|
assert parsed[4]["count"] == 3
|
|
assert parsed[5]["total_records"] == 3
|
|
|
|
|
|
@patch.object(tjs, "get_redis_client")
|
|
def test_stream_ndjson_with_aggregation(mock_redis):
|
|
"""NDJSON stream should include aggregation line when present."""
|
|
result_meta = {
|
|
"profile": "mid_section_defect",
|
|
"domains": {"upstream_history": {"chunks": 1, "total": 1}},
|
|
"failed_domains": [],
|
|
}
|
|
conn = MagicMock()
|
|
|
|
def _get_side_effect(key):
|
|
if key.endswith(":result:meta"):
|
|
return json.dumps(result_meta)
|
|
if key.endswith(":result:upstream_history:0"):
|
|
return json.dumps([{"id": 1}])
|
|
if key.endswith(":result:aggregation"):
|
|
return json.dumps({"summary": "ok"})
|
|
return None
|
|
|
|
conn.get.side_effect = _get_side_effect
|
|
mock_redis.return_value = conn
|
|
|
|
lines = list(tjs.stream_job_result_ndjson("job-1"))
|
|
parsed = [json.loads(line) for line in lines]
|
|
|
|
types = [p["type"] for p in parsed]
|
|
assert "aggregation" in types
|
|
|
|
agg_line = [p for p in parsed if p["type"] == "aggregation"][0]
|
|
assert agg_line["data"]["summary"] == "ok"
|
|
|
|
|
|
@patch.object(tjs, "get_redis_client")
|
|
def test_stream_ndjson_legacy_fallback(mock_redis):
|
|
"""NDJSON stream should emit full_result for legacy single-key storage."""
|
|
legacy_result = {
|
|
"stage": "events",
|
|
"results": {"history": {"data": [{"id": 1}], "count": 1}},
|
|
"aggregation": None,
|
|
}
|
|
conn = MagicMock()
|
|
|
|
def _get_side_effect(key):
|
|
if key.endswith(":result:meta"):
|
|
return None # no chunked meta
|
|
if key.endswith(":result"):
|
|
return json.dumps(legacy_result)
|
|
return None
|
|
|
|
conn.get.side_effect = _get_side_effect
|
|
mock_redis.return_value = conn
|
|
|
|
lines = list(tjs.stream_job_result_ndjson("job-1"))
|
|
parsed = [json.loads(line) for line in lines]
|
|
|
|
assert len(parsed) == 1
|
|
assert parsed[0]["type"] == "full_result"
|
|
assert parsed[0]["data"]["stage"] == "events"
|
|
|
|
|
|
@patch.object(tjs, "get_redis_client")
|
|
def test_stream_ndjson_with_failed_domains(mock_redis):
|
|
"""NDJSON stream should include warning line for partial failures."""
|
|
result_meta = {
|
|
"profile": "query_tool",
|
|
"domains": {"materials": {"chunks": 1, "total": 1}},
|
|
"failed_domains": ["history"],
|
|
}
|
|
conn = MagicMock()
|
|
|
|
def _get_side_effect(key):
|
|
if key.endswith(":result:meta"):
|
|
return json.dumps(result_meta)
|
|
if key.endswith(":result:materials:0"):
|
|
return json.dumps([{"id": 1}])
|
|
if key.endswith(":result:aggregation"):
|
|
return None
|
|
return None
|
|
|
|
conn.get.side_effect = _get_side_effect
|
|
mock_redis.return_value = conn
|
|
|
|
lines = list(tjs.stream_job_result_ndjson("job-1"))
|
|
parsed = [json.loads(line) for line in lines]
|
|
|
|
types = [p["type"] for p in parsed]
|
|
assert "warning" in types
|
|
|
|
warning = [p for p in parsed if p["type"] == "warning"][0]
|
|
assert warning["code"] == "EVENTS_PARTIAL_FAILURE"
|
|
assert "history" in warning["failed_domains"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _flatten_domain_records
|
|
# ---------------------------------------------------------------------------
|
|
def test_flatten_domain_records():
|
|
events_by_cid = {
|
|
"CID-1": [{"CONTAINERID": "CID-1", "EVT": "A"}],
|
|
"CID-2": [{"CONTAINERID": "CID-2", "EVT": "B"}, {"CONTAINERID": "CID-2", "EVT": "C"}],
|
|
}
|
|
rows = tjs._flatten_domain_records(events_by_cid)
|
|
assert len(rows) == 3
|