feat(lineage): unified LineageEngine, EventFetcher, and progressive trace API

Introduce a unified Seed→Lineage→Event pipeline replacing per-page Python
BFS with Oracle CONNECT BY NOCYCLE queries, add staged /api/trace/*
endpoints with rate limiting and L2 Redis caching, and wire progressive
frontend loading via useTraceProgress composable.

Key changes:
- Add LineageEngine (split ancestors / merge sources / full genealogy)
  with QueryBuilder bind-param safety and batched IN clauses
- Add EventFetcher with 6-domain support and L2 Redis cache
- Add trace_routes Blueprint (seed-resolve, lineage, events) with
  profile dispatch, rate limiting, and Redis TTL=300s caching
- Refactor query_tool_service to use LineageEngine and QueryBuilder,
  removing raw string interpolation (SQL injection fix)
- Add rate limits and resolve cache to query_tool_routes
- Integrate useTraceProgress into mid-section-defect with skeleton
  placeholders and fade-in transitions
- Add lineageCache and on-demand lot lineage to query-tool
- Add TraceProgressBar shared component
- Remove legacy query-tool.js static script (3k lines)
- Fix MatrixTable package column truncation (.slice(0,15) removed)
- Archive unified-lineage-engine change, add trace-progressive-ui specs

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
egg
2026-02-12 16:30:24 +08:00
parent c38b5f646a
commit 519f8ae2f4
52 changed files with 5074 additions and 4047 deletions

View File

@@ -0,0 +1,91 @@
# -*- coding: utf-8 -*-
"""Unit tests for EventFetcher."""
from __future__ import annotations
from unittest.mock import patch
import pandas as pd
from mes_dashboard.services.event_fetcher import EventFetcher
def test_cache_key_is_stable_for_sorted_ids():
key1 = EventFetcher._cache_key("history", ["CID-B", "CID-A", "CID-A"])
key2 = EventFetcher._cache_key("history", ["CID-A", "CID-B"])
assert key1 == key2
assert key1.startswith("evt:history:")
def test_get_rate_limit_config_supports_env_override(monkeypatch):
monkeypatch.setenv("EVT_HISTORY_RATE_MAX_REQUESTS", "33")
monkeypatch.setenv("EVT_HISTORY_RATE_WINDOW_SECONDS", "77")
config = EventFetcher._get_rate_limit_config("history")
assert config["bucket"] == "event-history"
assert config["max_attempts"] == 33
assert config["window_seconds"] == 77
@patch("mes_dashboard.services.event_fetcher.read_sql_df")
@patch("mes_dashboard.services.event_fetcher.cache_get")
def test_fetch_events_cache_hit_skips_db(mock_cache_get, mock_read_sql_df):
mock_cache_get.return_value = {"CID-1": [{"CONTAINERID": "CID-1"}]}
result = EventFetcher.fetch_events(["CID-1"], "materials")
assert result["CID-1"][0]["CONTAINERID"] == "CID-1"
mock_read_sql_df.assert_not_called()
@patch("mes_dashboard.services.event_fetcher.cache_set")
@patch("mes_dashboard.services.event_fetcher.cache_get", return_value=None)
@patch("mes_dashboard.services.event_fetcher.read_sql_df")
@patch("mes_dashboard.services.event_fetcher.SQLLoader.load_with_params")
def test_fetch_events_upstream_history_branch(
mock_sql_load,
mock_read_sql_df,
_mock_cache_get,
mock_cache_set,
):
mock_sql_load.return_value = "SELECT * FROM UPSTREAM"
mock_read_sql_df.return_value = pd.DataFrame(
[
{"CONTAINERID": "CID-1", "WORKCENTER_GROUP": "DB"},
{"CONTAINERID": "CID-2", "WORKCENTER_GROUP": "WB"},
]
)
result = EventFetcher.fetch_events(["CID-1", "CID-2"], "upstream_history")
assert sorted(result.keys()) == ["CID-1", "CID-2"]
assert mock_sql_load.call_args.args[0] == "mid_section_defect/upstream_history"
_, params = mock_read_sql_df.call_args.args
assert len(params) == 2
mock_cache_set.assert_called_once()
assert mock_cache_set.call_args.args[0].startswith("evt:upstream_history:")
@patch("mes_dashboard.services.event_fetcher.cache_set")
@patch("mes_dashboard.services.event_fetcher.cache_get", return_value=None)
@patch("mes_dashboard.services.event_fetcher.read_sql_df")
@patch("mes_dashboard.services.event_fetcher.SQLLoader.load")
def test_fetch_events_history_branch_replaces_container_filter(
mock_sql_load,
mock_read_sql_df,
_mock_cache_get,
_mock_cache_set,
):
mock_sql_load.return_value = (
"SELECT * FROM t WHERE h.CONTAINERID = :container_id {{ WORKCENTER_FILTER }}"
)
mock_read_sql_df.return_value = pd.DataFrame([])
EventFetcher.fetch_events(["CID-1"], "history")
sql, params = mock_read_sql_df.call_args.args
assert "h.CONTAINERID = :container_id" not in sql
assert "{{ WORKCENTER_FILTER }}" not in sql
assert params == {"p0": "CID-1"}

View File

@@ -0,0 +1,231 @@
# -*- coding: utf-8 -*-
"""Unit tests for LineageEngine."""
from __future__ import annotations
from unittest.mock import patch
import pandas as pd
from mes_dashboard.services.lineage_engine import LineageEngine
@patch("mes_dashboard.services.lineage_engine.read_sql_df")
def test_resolve_split_ancestors_batches_and_enforces_max_depth(mock_read_sql_df):
cids = [f"C{i:04d}" for i in range(1001)]
mock_read_sql_df.side_effect = [
pd.DataFrame(
[
{
"CONTAINERID": "C0000",
"SPLITFROMID": "P0000",
"CONTAINERNAME": "LOT-0000",
"SPLIT_DEPTH": 1,
},
{
"CONTAINERID": "P0000",
"SPLITFROMID": None,
"CONTAINERNAME": "LOT-P0000",
"SPLIT_DEPTH": 2,
},
]
),
pd.DataFrame(
[
{
"CONTAINERID": "C1000",
"SPLITFROMID": "P1000",
"CONTAINERNAME": "LOT-1000",
"SPLIT_DEPTH": 1,
},
{
"CONTAINERID": "C-TOO-DEEP",
"SPLITFROMID": "P-TOO-DEEP",
"CONTAINERNAME": "LOT-DEEP",
"SPLIT_DEPTH": 21,
},
]
),
]
result = LineageEngine.resolve_split_ancestors(cids, {"INIT": "LOT-INIT"})
assert mock_read_sql_df.call_count == 2
first_sql, first_params = mock_read_sql_df.call_args_list[0].args
second_sql, second_params = mock_read_sql_df.call_args_list[1].args
assert "LEVEL <= 20" in first_sql
assert "LEVEL <= 20" in second_sql
assert len(first_params) == 1000
assert len(second_params) == 1
assert result["child_to_parent"]["C0000"] == "P0000"
assert result["child_to_parent"]["C1000"] == "P1000"
assert "C-TOO-DEEP" not in result["child_to_parent"]
assert result["cid_to_name"]["C0000"] == "LOT-0000"
assert result["cid_to_name"]["INIT"] == "LOT-INIT"
@patch("mes_dashboard.services.lineage_engine.read_sql_df")
def test_resolve_merge_sources_batches_and_returns_mapping(mock_read_sql_df):
names = [f"FN{i:04d}" for i in range(1001)]
mock_read_sql_df.side_effect = [
pd.DataFrame(
[
{"FINISHEDNAME": "FN0000", "SOURCE_CID": "SRC-A"},
{"FINISHEDNAME": "FN0000", "SOURCE_CID": "SRC-B"},
]
),
pd.DataFrame(
[
{"FINISHEDNAME": "FN1000", "SOURCE_CID": "SRC-C"},
{"FINISHEDNAME": "FN1000", "SOURCE_CID": "SRC-C"},
{"FINISHEDNAME": None, "SOURCE_CID": "SRC-INVALID"},
]
),
]
result = LineageEngine.resolve_merge_sources(names)
assert mock_read_sql_df.call_count == 2
first_sql, first_params = mock_read_sql_df.call_args_list[0].args
second_sql, second_params = mock_read_sql_df.call_args_list[1].args
assert "{{ FINISHED_NAME_FILTER }}" not in first_sql
assert "{{ FINISHED_NAME_FILTER }}" not in second_sql
assert len(first_params) == 1000
assert len(second_params) == 1
assert result["FN0000"] == ["SRC-A", "SRC-B"]
assert result["FN1000"] == ["SRC-C"]
@patch("mes_dashboard.services.lineage_engine.LineageEngine.resolve_merge_sources")
@patch("mes_dashboard.services.lineage_engine.LineageEngine.resolve_split_ancestors")
def test_resolve_full_genealogy_combines_split_and_merge(
mock_resolve_split_ancestors,
mock_resolve_merge_sources,
):
mock_resolve_split_ancestors.side_effect = [
{
"child_to_parent": {
"A": "B",
"B": "C",
},
"cid_to_name": {
"A": "LOT-A",
"B": "LOT-B",
"C": "LOT-C",
},
},
{
"child_to_parent": {
"M1": "M0",
},
"cid_to_name": {
"M1": "LOT-M1",
"M0": "LOT-M0",
},
},
]
mock_resolve_merge_sources.return_value = {"LOT-B": ["M1"]}
result = LineageEngine.resolve_full_genealogy(["A"], {"A": "LOT-A"})
assert result == {"A": {"B", "C", "M1", "M0"}}
assert mock_resolve_split_ancestors.call_count == 2
mock_resolve_merge_sources.assert_called_once()
@patch("mes_dashboard.services.lineage_engine.read_sql_df")
def test_split_ancestors_matches_legacy_bfs_for_five_known_lots(mock_read_sql_df):
parent_by_cid = {
"L1": "L1P1",
"L1P1": "L1P2",
"L2": "L2P1",
"L3": None,
"L4": "L4P1",
"L4P1": "L4P2",
"L4P2": "L4P3",
"L5": "L5P1",
"L5P1": "L5P2",
"L5P2": "L5P1",
}
name_by_cid = {
"L1": "LOT-1",
"L1P1": "LOT-1-P1",
"L1P2": "LOT-1-P2",
"L2": "LOT-2",
"L2P1": "LOT-2-P1",
"L3": "LOT-3",
"L4": "LOT-4",
"L4P1": "LOT-4-P1",
"L4P2": "LOT-4-P2",
"L4P3": "LOT-4-P3",
"L5": "LOT-5",
"L5P1": "LOT-5-P1",
"L5P2": "LOT-5-P2",
}
seed_lots = ["L1", "L2", "L3", "L4", "L5"]
def _connect_by_rows(start_cids):
rows = []
for seed in start_cids:
current = seed
depth = 1
visited = set()
while current and depth <= 20 and current not in visited:
visited.add(current)
rows.append(
{
"CONTAINERID": current,
"SPLITFROMID": parent_by_cid.get(current),
"CONTAINERNAME": name_by_cid.get(current),
"SPLIT_DEPTH": depth,
}
)
current = parent_by_cid.get(current)
depth += 1
return pd.DataFrame(rows)
def _mock_read_sql(_sql, params):
requested = [value for value in params.values()]
return _connect_by_rows(requested)
mock_read_sql_df.side_effect = _mock_read_sql
connect_by_result = LineageEngine.resolve_split_ancestors(seed_lots)
# Legacy BFS reference implementation from previous mid_section_defect_service.
legacy_child_to_parent = {}
legacy_cid_to_name = {}
frontier = list(seed_lots)
seen = set(seed_lots)
rounds = 0
while frontier:
rounds += 1
batch_rows = []
for cid in frontier:
batch_rows.append(
{
"CONTAINERID": cid,
"SPLITFROMID": parent_by_cid.get(cid),
"CONTAINERNAME": name_by_cid.get(cid),
}
)
new_parents = set()
for row in batch_rows:
cid = row["CONTAINERID"]
split_from = row["SPLITFROMID"]
name = row["CONTAINERNAME"]
if isinstance(name, str) and name:
legacy_cid_to_name[cid] = name
if isinstance(split_from, str) and split_from and split_from != cid:
legacy_child_to_parent[cid] = split_from
if split_from not in seen:
seen.add(split_from)
new_parents.add(split_from)
frontier = list(new_parents)
if rounds > 20:
break
assert connect_by_result["child_to_parent"] == legacy_child_to_parent
assert connect_by_result["cid_to_name"] == legacy_cid_to_name

View File

@@ -8,6 +8,7 @@ from unittest.mock import patch
import pandas as pd
from mes_dashboard.services.mid_section_defect_service import (
build_trace_aggregation_from_events,
query_analysis,
query_analysis_detail,
query_all_loss_reasons,
@@ -126,3 +127,116 @@ def test_query_all_loss_reasons_cache_miss_queries_and_caches_sorted_values(
{'loss_reasons': ['A_REASON', 'B_REASON']},
ttl=86400,
)
@patch('mes_dashboard.services.mid_section_defect_service.cache_set')
@patch('mes_dashboard.services.mid_section_defect_service.cache_get', return_value=None)
@patch('mes_dashboard.services.mid_section_defect_service.release_lock')
@patch('mes_dashboard.services.mid_section_defect_service.try_acquire_lock', return_value=True)
@patch('mes_dashboard.services.mid_section_defect_service._fetch_upstream_history')
@patch('mes_dashboard.services.mid_section_defect_service._resolve_full_genealogy')
@patch('mes_dashboard.services.mid_section_defect_service._fetch_tmtt_data')
def test_trace_aggregation_matches_query_analysis_summary(
mock_fetch_tmtt_data,
mock_resolve_genealogy,
mock_fetch_upstream_history,
_mock_lock,
_mock_release_lock,
_mock_cache_get,
_mock_cache_set,
):
tmtt_df = pd.DataFrame([
{
'CONTAINERID': 'CID-001',
'CONTAINERNAME': 'LOT-001',
'TRACKINQTY': 100,
'REJECTQTY': 5,
'LOSSREASONNAME': 'R1',
'WORKFLOW': 'WF-A',
'PRODUCTLINENAME': 'PKG-A',
'PJ_TYPE': 'TYPE-A',
'TMTT_EQUIPMENTNAME': 'TMTT-01',
'TRACKINTIMESTAMP': '2025-01-10 10:00:00',
'FINISHEDRUNCARD': 'FR-001',
},
{
'CONTAINERID': 'CID-002',
'CONTAINERNAME': 'LOT-002',
'TRACKINQTY': 120,
'REJECTQTY': 6,
'LOSSREASONNAME': 'R2',
'WORKFLOW': 'WF-B',
'PRODUCTLINENAME': 'PKG-B',
'PJ_TYPE': 'TYPE-B',
'TMTT_EQUIPMENTNAME': 'TMTT-02',
'TRACKINTIMESTAMP': '2025-01-11 10:00:00',
'FINISHEDRUNCARD': 'FR-002',
},
])
ancestors = {
'CID-001': {'CID-101'},
'CID-002': set(),
}
upstream_normalized = {
'CID-101': [{
'workcenter_group': '中段',
'equipment_id': 'EQ-01',
'equipment_name': 'EQ-01',
'spec_name': 'SPEC-A',
'track_in_time': '2025-01-09 08:00:00',
}],
'CID-002': [{
'workcenter_group': '中段',
'equipment_id': 'EQ-02',
'equipment_name': 'EQ-02',
'spec_name': 'SPEC-B',
'track_in_time': '2025-01-11 08:00:00',
}],
}
upstream_events = {
'CID-101': [{
'WORKCENTER_GROUP': '中段',
'EQUIPMENTID': 'EQ-01',
'EQUIPMENTNAME': 'EQ-01',
'SPECNAME': 'SPEC-A',
'TRACKINTIMESTAMP': '2025-01-09 08:00:00',
}],
'CID-002': [{
'WORKCENTER_GROUP': '中段',
'EQUIPMENTID': 'EQ-02',
'EQUIPMENTNAME': 'EQ-02',
'SPECNAME': 'SPEC-B',
'TRACKINTIMESTAMP': '2025-01-11 08:00:00',
}],
}
mock_fetch_tmtt_data.return_value = tmtt_df
mock_resolve_genealogy.return_value = ancestors
mock_fetch_upstream_history.return_value = upstream_normalized
summary = query_analysis('2025-01-01', '2025-01-31')
staged_summary = build_trace_aggregation_from_events(
'2025-01-01',
'2025-01-31',
seed_container_ids=['CID-001', 'CID-002'],
lineage_ancestors={
'CID-001': ['CID-101'],
'CID-002': [],
},
upstream_events_by_cid=upstream_events,
)
assert staged_summary['available_loss_reasons'] == summary['available_loss_reasons']
assert staged_summary['genealogy_status'] == summary['genealogy_status']
assert staged_summary['detail_total_count'] == len(summary['detail'])
assert staged_summary['kpi']['total_input'] == summary['kpi']['total_input']
assert staged_summary['kpi']['lot_count'] == summary['kpi']['lot_count']
assert staged_summary['kpi']['total_defect_qty'] == summary['kpi']['total_defect_qty']
assert abs(
staged_summary['kpi']['total_defect_rate'] - summary['kpi']['total_defect_rate']
) <= 0.01
assert staged_summary['daily_trend'] == summary['daily_trend']
assert staged_summary['charts'].keys() == summary['charts'].keys()

View File

@@ -7,25 +7,35 @@ Tests the API endpoints with mocked service dependencies:
- Error handling
"""
import pytest
import json
from unittest.mock import patch, MagicMock
from mes_dashboard import create_app
import pytest
import json
from unittest.mock import patch, MagicMock
from mes_dashboard import create_app
from mes_dashboard.core.cache import NoOpCache
from mes_dashboard.core.rate_limit import reset_rate_limits_for_tests
@pytest.fixture
def app():
"""Create test Flask application."""
app = create_app()
app.config['TESTING'] = True
return app
def app():
"""Create test Flask application."""
app = create_app()
app.config['TESTING'] = True
app.extensions["cache"] = NoOpCache()
return app
@pytest.fixture
def client(app):
"""Create test client."""
return app.test_client()
@pytest.fixture
def client(app):
"""Create test client."""
return app.test_client()
@pytest.fixture(autouse=True)
def _reset_rate_limits():
reset_rate_limits_for_tests()
yield
reset_rate_limits_for_tests()
class TestQueryToolPage:
@@ -129,8 +139,8 @@ class TestResolveEndpoint:
assert data['total'] == 1
assert data['data'][0]['lot_id'] == 'GA23100020-A00-001'
@patch('mes_dashboard.routes.query_tool_routes.resolve_lots')
def test_resolve_not_found(self, mock_resolve, client):
@patch('mes_dashboard.routes.query_tool_routes.resolve_lots')
def test_resolve_not_found(self, mock_resolve, client):
"""Should return not_found list for missing LOT IDs."""
mock_resolve.return_value = {
'data': [],
@@ -148,8 +158,56 @@ class TestResolveEndpoint:
)
assert response.status_code == 200
data = json.loads(response.data)
assert data['total'] == 0
assert 'INVALID-LOT-ID' in data['not_found']
assert data['total'] == 0
assert 'INVALID-LOT-ID' in data['not_found']
@patch('mes_dashboard.routes.query_tool_routes.resolve_lots')
@patch('mes_dashboard.routes.query_tool_routes.cache_get')
def test_resolve_cache_hit_skips_service(self, mock_cache_get, mock_resolve, client):
mock_cache_get.return_value = {
'data': [{'container_id': 'C1', 'input_value': 'LOT-1'}],
'total': 1,
'input_count': 1,
'not_found': [],
}
response = client.post(
'/api/query-tool/resolve',
json={'input_type': 'lot_id', 'values': ['LOT-1']},
)
assert response.status_code == 200
payload = response.get_json()
assert payload['total'] == 1
mock_resolve.assert_not_called()
@patch('mes_dashboard.routes.query_tool_routes.cache_set')
@patch('mes_dashboard.routes.query_tool_routes.cache_get', return_value=None)
@patch('mes_dashboard.routes.query_tool_routes.resolve_lots')
def test_resolve_success_caches_result(
self,
mock_resolve,
_mock_cache_get,
mock_cache_set,
client,
):
mock_resolve.return_value = {
'data': [{'container_id': 'C1', 'input_value': 'LOT-1'}],
'total': 1,
'input_count': 1,
'not_found': [],
}
response = client.post(
'/api/query-tool/resolve',
json={'input_type': 'lot_id', 'values': ['LOT-1']},
)
assert response.status_code == 200
mock_cache_set.assert_called_once()
cache_key = mock_cache_set.call_args.args[0]
assert cache_key.startswith('qt:resolve:lot_id:')
assert mock_cache_set.call_args.kwargs['ttl'] == 60
class TestLotHistoryEndpoint:
@@ -267,7 +325,7 @@ class TestAdjacentLotsEndpoint:
assert '2024-01-15' in call_args[0][1] # target_time
class TestLotAssociationsEndpoint:
class TestLotAssociationsEndpoint:
"""Tests for /api/query-tool/lot-associations endpoint."""
def test_missing_container_id(self, client):
@@ -294,8 +352,8 @@ class TestLotAssociationsEndpoint:
assert 'error' in data
assert '不支援' in data['error'] or 'type' in data['error'].lower()
@patch('mes_dashboard.routes.query_tool_routes.get_lot_materials')
def test_lot_materials_success(self, mock_query, client):
@patch('mes_dashboard.routes.query_tool_routes.get_lot_materials')
def test_lot_materials_success(self, mock_query, client):
"""Should return lot materials on success."""
mock_query.return_value = {
'data': [
@@ -313,8 +371,137 @@ class TestLotAssociationsEndpoint:
)
assert response.status_code == 200
data = json.loads(response.data)
assert 'data' in data
assert data['total'] == 1
assert 'data' in data
assert data['total'] == 1
@patch('mes_dashboard.routes.query_tool_routes.get_lot_splits')
def test_lot_splits_default_fast_mode(self, mock_query, client):
mock_query.return_value = {'data': [], 'total': 0}
response = client.get(
'/api/query-tool/lot-associations?container_id=488103800029578b&type=splits'
)
assert response.status_code == 200
mock_query.assert_called_once_with('488103800029578b', full_history=False)
@patch('mes_dashboard.routes.query_tool_routes.get_lot_splits')
def test_lot_splits_full_history_mode(self, mock_query, client):
mock_query.return_value = {'data': [], 'total': 0}
response = client.get(
'/api/query-tool/lot-associations?'
'container_id=488103800029578b&type=splits&full_history=true'
)
assert response.status_code == 200
mock_query.assert_called_once_with('488103800029578b', full_history=True)
class TestQueryToolRateLimit:
"""Rate-limit behavior for high-cost query-tool endpoints."""
@patch('mes_dashboard.routes.query_tool_routes.resolve_lots')
@patch('mes_dashboard.core.rate_limit.check_and_record', return_value=(True, 5))
def test_resolve_rate_limited_returns_429(self, _mock_limit, mock_resolve, client):
response = client.post(
'/api/query-tool/resolve',
json={'input_type': 'lot_id', 'values': ['GA23100020-A00-001']},
)
assert response.status_code == 429
assert response.headers.get('Retry-After') == '5'
payload = response.get_json()
assert payload['error']['code'] == 'TOO_MANY_REQUESTS'
mock_resolve.assert_not_called()
@patch('mes_dashboard.routes.query_tool_routes.get_lot_history')
@patch('mes_dashboard.core.rate_limit.check_and_record', return_value=(True, 6))
def test_lot_history_rate_limited_returns_429(self, _mock_limit, mock_history, client):
response = client.get('/api/query-tool/lot-history?container_id=488103800029578b')
assert response.status_code == 429
assert response.headers.get('Retry-After') == '6'
payload = response.get_json()
assert payload['error']['code'] == 'TOO_MANY_REQUESTS'
mock_history.assert_not_called()
@patch('mes_dashboard.routes.query_tool_routes.get_lot_materials')
@patch('mes_dashboard.core.rate_limit.check_and_record', return_value=(True, 7))
def test_lot_association_rate_limited_returns_429(
self,
_mock_limit,
mock_materials,
client,
):
response = client.get(
'/api/query-tool/lot-associations?container_id=488103800029578b&type=materials'
)
assert response.status_code == 429
assert response.headers.get('Retry-After') == '7'
payload = response.get_json()
assert payload['error']['code'] == 'TOO_MANY_REQUESTS'
mock_materials.assert_not_called()
@patch('mes_dashboard.routes.query_tool_routes.get_adjacent_lots')
@patch('mes_dashboard.core.rate_limit.check_and_record', return_value=(True, 8))
def test_adjacent_lots_rate_limited_returns_429(
self,
_mock_limit,
mock_adjacent,
client,
):
response = client.get(
'/api/query-tool/adjacent-lots?equipment_id=EQ001&target_time=2024-01-15T10:30:00'
)
assert response.status_code == 429
assert response.headers.get('Retry-After') == '8'
payload = response.get_json()
assert payload['error']['code'] == 'TOO_MANY_REQUESTS'
mock_adjacent.assert_not_called()
@patch('mes_dashboard.routes.query_tool_routes.get_equipment_status_hours')
@patch('mes_dashboard.core.rate_limit.check_and_record', return_value=(True, 9))
def test_equipment_period_rate_limited_returns_429(
self,
_mock_limit,
mock_equipment,
client,
):
response = client.post(
'/api/query-tool/equipment-period',
json={
'equipment_ids': ['EQ001'],
'start_date': '2024-01-01',
'end_date': '2024-01-31',
'query_type': 'status_hours',
},
)
assert response.status_code == 429
assert response.headers.get('Retry-After') == '9'
payload = response.get_json()
assert payload['error']['code'] == 'TOO_MANY_REQUESTS'
mock_equipment.assert_not_called()
@patch('mes_dashboard.routes.query_tool_routes.get_lot_history')
@patch('mes_dashboard.core.rate_limit.check_and_record', return_value=(True, 10))
def test_export_rate_limited_returns_429(self, _mock_limit, mock_history, client):
response = client.post(
'/api/query-tool/export-csv',
json={
'export_type': 'lot_history',
'params': {'container_id': '488103800029578b'},
},
)
assert response.status_code == 429
assert response.headers.get('Retry-After') == '10'
payload = response.get_json()
assert payload['error']['code'] == 'TOO_MANY_REQUESTS'
mock_history.assert_not_called()
class TestEquipmentPeriodEndpoint:

View File

@@ -8,15 +8,17 @@ Tests the core service functions without database dependencies:
"""
import pytest
from mes_dashboard.services.query_tool_service import (
validate_date_range,
validate_lot_input,
validate_equipment_input,
_build_in_clause,
_build_in_filter,
BATCH_SIZE,
MAX_LOT_IDS,
MAX_SERIAL_NUMBERS,
from mes_dashboard.services.query_tool_service import (
validate_date_range,
validate_lot_input,
validate_equipment_input,
_resolve_by_lot_id,
_resolve_by_serial_number,
_resolve_by_work_order,
get_lot_split_merge_history,
BATCH_SIZE,
MAX_LOT_IDS,
MAX_SERIAL_NUMBERS,
MAX_WORK_ORDERS,
MAX_EQUIPMENTS,
MAX_DATE_RANGE_DAYS,
@@ -184,86 +186,124 @@ class TestValidateEquipmentInput:
assert result is None
class TestBuildInClause:
"""Tests for _build_in_clause function."""
def test_empty_list(self):
"""Should return empty list for empty input."""
result = _build_in_clause([])
assert result == []
def test_single_value(self):
"""Should return single chunk for single value."""
result = _build_in_clause(['VAL001'])
assert len(result) == 1
assert result[0] == "'VAL001'"
def test_multiple_values(self):
"""Should join multiple values with comma."""
result = _build_in_clause(['VAL001', 'VAL002', 'VAL003'])
assert len(result) == 1
assert "'VAL001'" in result[0]
assert "'VAL002'" in result[0]
assert "'VAL003'" in result[0]
assert result[0] == "'VAL001', 'VAL002', 'VAL003'"
def test_chunking(self):
"""Should chunk when exceeding batch size."""
# Create more than BATCH_SIZE values
values = [f'VAL{i:06d}' for i in range(BATCH_SIZE + 10)]
result = _build_in_clause(values)
assert len(result) == 2
# First chunk should have BATCH_SIZE items
assert result[0].count("'") == BATCH_SIZE * 2 # 2 quotes per value
def test_escape_single_quotes(self):
"""Should escape single quotes in values."""
result = _build_in_clause(["VAL'001"])
assert len(result) == 1
assert "VAL''001" in result[0] # Escaped
def test_custom_chunk_size(self):
"""Should respect custom chunk size."""
values = ['V1', 'V2', 'V3', 'V4', 'V5']
result = _build_in_clause(values, max_chunk_size=2)
assert len(result) == 3 # 2+2+1
class TestBuildInFilter:
"""Tests for _build_in_filter function."""
def test_empty_list(self):
"""Should return 1=0 for empty input (no results)."""
result = _build_in_filter([], 'COL')
assert result == "1=0"
def test_single_value(self):
"""Should build simple IN clause for single value."""
result = _build_in_filter(['VAL001'], 'COL')
assert "COL IN" in result
assert "'VAL001'" in result
def test_multiple_values(self):
"""Should build IN clause with multiple values."""
result = _build_in_filter(['VAL001', 'VAL002'], 'COL')
assert "COL IN" in result
assert "'VAL001'" in result
assert "'VAL002'" in result
def test_custom_column(self):
"""Should use custom column name."""
result = _build_in_filter(['VAL001'], 't.MYCOL')
assert "t.MYCOL IN" in result
def test_large_list_uses_or(self):
"""Should use OR for chunked results."""
# Create more than BATCH_SIZE values
values = [f'VAL{i:06d}' for i in range(BATCH_SIZE + 10)]
result = _build_in_filter(values, 'COL')
assert " OR " in result
# Should have parentheses wrapping the OR conditions
assert result.startswith("(")
assert result.endswith(")")
class TestResolveQueriesUseBindParams:
"""Queries with user input should always use bind params."""
def test_resolve_by_lot_id_uses_query_builder_params(self):
from unittest.mock import patch
import pandas as pd
with patch('mes_dashboard.services.query_tool_service.SQLLoader.load_with_params') as mock_load:
with patch('mes_dashboard.services.query_tool_service.read_sql_df') as mock_read:
mock_load.return_value = "SELECT * FROM DUAL"
mock_read.return_value = pd.DataFrame([
{
'CONTAINERID': 'CID-1',
'CONTAINERNAME': 'LOT-1',
'SPECNAME': 'SPEC-1',
'QTY': 100,
}
])
result = _resolve_by_lot_id(['LOT-1'])
assert result['total'] == 1
mock_load.assert_called_once()
sql_params = mock_load.call_args.kwargs
assert 'CONTAINER_FILTER' in sql_params
assert ':p0' in sql_params['CONTAINER_FILTER']
_, query_params = mock_read.call_args.args
assert query_params == {'p0': 'LOT-1'}
def test_resolve_by_serial_number_uses_query_builder_params(self):
from unittest.mock import patch
import pandas as pd
with patch('mes_dashboard.services.query_tool_service.SQLLoader.load_with_params') as mock_load:
with patch('mes_dashboard.services.query_tool_service.read_sql_df') as mock_read:
mock_load.return_value = "SELECT * FROM DUAL"
mock_read.return_value = pd.DataFrame([
{
'CONTAINERID': 'CID-1',
'FINISHEDNAME': 'SN-1',
'CONTAINERNAME': 'LOT-1',
'SPECNAME': 'SPEC-1',
}
])
result = _resolve_by_serial_number(['SN-1'])
assert result['total'] == 1
sql_params = mock_load.call_args.kwargs
assert ':p0' in sql_params['SERIAL_FILTER']
_, query_params = mock_read.call_args.args
assert query_params == {'p0': 'SN-1'}
def test_resolve_by_work_order_uses_query_builder_params(self):
from unittest.mock import patch
import pandas as pd
with patch('mes_dashboard.services.query_tool_service.SQLLoader.load_with_params') as mock_load:
with patch('mes_dashboard.services.query_tool_service.read_sql_df') as mock_read:
mock_load.return_value = "SELECT * FROM DUAL"
mock_read.return_value = pd.DataFrame([
{
'CONTAINERID': 'CID-1',
'PJ_WORKORDER': 'WO-1',
'CONTAINERNAME': 'LOT-1',
'SPECNAME': 'SPEC-1',
}
])
result = _resolve_by_work_order(['WO-1'])
assert result['total'] == 1
sql_params = mock_load.call_args.kwargs
assert ':p0' in sql_params['WORK_ORDER_FILTER']
_, query_params = mock_read.call_args.args
assert query_params == {'p0': 'WO-1'}
class TestSplitMergeHistoryMode:
"""Fast mode should use read_sql_df, full mode should use read_sql_df_slow."""
def test_fast_mode_uses_time_window_and_row_limit(self):
from unittest.mock import patch
import pandas as pd
with patch('mes_dashboard.services.query_tool_service.SQLLoader.load_with_params') as mock_load:
with patch('mes_dashboard.services.query_tool_service.read_sql_df') as mock_fast:
with patch('mes_dashboard.services.query_tool_service.read_sql_df_slow') as mock_slow:
mock_load.return_value = "SELECT * FROM DUAL"
mock_fast.return_value = pd.DataFrame([])
result = get_lot_split_merge_history('WO-1', full_history=False)
assert result['mode'] == 'fast'
kwargs = mock_load.call_args.kwargs
assert "ADD_MONTHS(SYSDATE, -6)" in kwargs['TIME_WINDOW']
assert "FETCH FIRST 500 ROWS ONLY" == kwargs['ROW_LIMIT']
mock_fast.assert_called_once()
mock_slow.assert_not_called()
def test_full_mode_uses_slow_query_without_limits(self):
from unittest.mock import patch
import pandas as pd
with patch('mes_dashboard.services.query_tool_service.SQLLoader.load_with_params') as mock_load:
with patch('mes_dashboard.services.query_tool_service.read_sql_df') as mock_fast:
with patch('mes_dashboard.services.query_tool_service.read_sql_df_slow') as mock_slow:
mock_load.return_value = "SELECT * FROM DUAL"
mock_slow.return_value = pd.DataFrame([])
result = get_lot_split_merge_history('WO-1', full_history=True)
assert result['mode'] == 'full'
kwargs = mock_load.call_args.kwargs
assert kwargs['TIME_WINDOW'] == ''
assert kwargs['ROW_LIMIT'] == ''
mock_fast.assert_not_called()
mock_slow.assert_called_once()
class TestServiceConstants:
@@ -323,98 +363,78 @@ class TestGetWorkcenterForGroups:
assert result == []
class TestGetLotHistoryWithWorkcenterFilter:
"""Tests for get_lot_history with workcenter_groups filter."""
def test_no_filter_returns_all(self):
"""When no workcenter_groups, should not add filter to SQL."""
from unittest.mock import patch, MagicMock
import pandas as pd
with patch('mes_dashboard.services.query_tool_service.read_sql_df') as mock_read:
with patch('mes_dashboard.services.query_tool_service.SQLLoader') as mock_loader:
from mes_dashboard.services.query_tool_service import get_lot_history
mock_loader.load.return_value = 'SELECT * FROM t WHERE c = :container_id {{ WORKCENTER_FILTER }}'
mock_read.return_value = pd.DataFrame({
'CONTAINERID': ['abc123'],
'WORKCENTERNAME': ['DB_1'],
})
result = get_lot_history('abc123', workcenter_groups=None)
assert 'error' not in result
assert result['filtered_by_groups'] == []
# Verify SQL does not contain WORKCENTERNAME IN
sql_called = mock_read.call_args[0][0]
assert 'WORKCENTERNAME IN' not in sql_called
assert '{{ WORKCENTER_FILTER }}' not in sql_called
def test_with_filter_adds_condition(self):
"""When workcenter_groups provided, should filter by workcenters."""
from unittest.mock import patch
import pandas as pd
with patch('mes_dashboard.services.query_tool_service.read_sql_df') as mock_read:
with patch('mes_dashboard.services.query_tool_service.SQLLoader') as mock_loader:
with patch('mes_dashboard.services.filter_cache.get_workcenters_for_groups') as mock_get_wc:
from mes_dashboard.services.query_tool_service import get_lot_history
mock_loader.load.return_value = 'SELECT * FROM t WHERE c = :container_id {{ WORKCENTER_FILTER }}'
mock_get_wc.return_value = ['DB_1', 'DB_2']
mock_read.return_value = pd.DataFrame({
'CONTAINERID': ['abc123'],
'WORKCENTERNAME': ['DB_1'],
})
result = get_lot_history('abc123', workcenter_groups=['DB'])
mock_get_wc.assert_called_once_with(['DB'])
assert result['filtered_by_groups'] == ['DB']
# Verify SQL contains filter
sql_called = mock_read.call_args[0][0]
assert 'WORKCENTERNAME' in sql_called
def test_empty_groups_list_no_filter(self):
"""Empty groups list should return all (no filter)."""
from unittest.mock import patch
import pandas as pd
with patch('mes_dashboard.services.query_tool_service.read_sql_df') as mock_read:
with patch('mes_dashboard.services.query_tool_service.SQLLoader') as mock_loader:
from mes_dashboard.services.query_tool_service import get_lot_history
mock_loader.load.return_value = 'SELECT * FROM t WHERE c = :container_id {{ WORKCENTER_FILTER }}'
mock_read.return_value = pd.DataFrame({
'CONTAINERID': ['abc123'],
'WORKCENTERNAME': ['DB_1'],
})
result = get_lot_history('abc123', workcenter_groups=[])
assert result['filtered_by_groups'] == []
# Verify SQL does not contain WORKCENTERNAME IN
sql_called = mock_read.call_args[0][0]
assert 'WORKCENTERNAME IN' not in sql_called
def test_filter_with_empty_workcenters_result(self):
"""When group has no workcenters, should not add filter."""
from unittest.mock import patch
import pandas as pd
with patch('mes_dashboard.services.query_tool_service.read_sql_df') as mock_read:
with patch('mes_dashboard.services.query_tool_service.SQLLoader') as mock_loader:
with patch('mes_dashboard.services.filter_cache.get_workcenters_for_groups') as mock_get_wc:
from mes_dashboard.services.query_tool_service import get_lot_history
mock_loader.load.return_value = 'SELECT * FROM t WHERE c = :container_id {{ WORKCENTER_FILTER }}'
mock_get_wc.return_value = [] # No workcenters for this group
mock_read.return_value = pd.DataFrame({
'CONTAINERID': ['abc123'],
'WORKCENTERNAME': ['DB_1'],
})
result = get_lot_history('abc123', workcenter_groups=['UNKNOWN'])
# Should still succeed, just no filter applied
assert 'error' not in result
class TestGetLotHistoryWithWorkcenterFilter:
"""Tests for get_lot_history with workcenter_groups filter."""
def test_no_filter_returns_all(self):
"""When no workcenter_groups, should not add filter to SQL."""
from unittest.mock import patch
from mes_dashboard.services.query_tool_service import get_lot_history
with patch('mes_dashboard.services.query_tool_service.EventFetcher.fetch_events') as mock_fetch:
mock_fetch.return_value = {
'abc123': [
{'CONTAINERID': 'abc123', 'WORKCENTERNAME': 'DB_1'},
{'CONTAINERID': 'abc123', 'WORKCENTERNAME': 'WB_1'},
]
}
result = get_lot_history('abc123', workcenter_groups=None)
assert 'error' not in result
assert result['filtered_by_groups'] == []
assert result['total'] == 2
def test_with_filter_adds_condition(self):
"""When workcenter_groups provided, should filter by workcenters."""
from unittest.mock import patch
from mes_dashboard.services.query_tool_service import get_lot_history
with patch('mes_dashboard.services.query_tool_service.EventFetcher.fetch_events') as mock_fetch:
with patch('mes_dashboard.services.filter_cache.get_workcenters_for_groups') as mock_get_wc:
mock_fetch.return_value = {
'abc123': [
{'CONTAINERID': 'abc123', 'WORKCENTERNAME': 'DB_1'},
{'CONTAINERID': 'abc123', 'WORKCENTERNAME': 'WB_1'},
]
}
mock_get_wc.return_value = ['DB_1']
result = get_lot_history('abc123', workcenter_groups=['DB'])
mock_get_wc.assert_called_once_with(['DB'])
assert result['filtered_by_groups'] == ['DB']
assert result['total'] == 1
assert result['data'][0]['WORKCENTERNAME'] == 'DB_1'
def test_empty_groups_list_no_filter(self):
"""Empty groups list should return all (no filter)."""
from unittest.mock import patch
from mes_dashboard.services.query_tool_service import get_lot_history
with patch('mes_dashboard.services.query_tool_service.EventFetcher.fetch_events') as mock_fetch:
mock_fetch.return_value = {
'abc123': [{'CONTAINERID': 'abc123', 'WORKCENTERNAME': 'DB_1'}]
}
result = get_lot_history('abc123', workcenter_groups=[])
assert result['filtered_by_groups'] == []
assert result['total'] == 1
def test_filter_with_empty_workcenters_result(self):
"""When group has no workcenters, should not add filter."""
from unittest.mock import patch
from mes_dashboard.services.query_tool_service import get_lot_history
with patch('mes_dashboard.services.query_tool_service.EventFetcher.fetch_events') as mock_fetch:
with patch('mes_dashboard.services.filter_cache.get_workcenters_for_groups') as mock_get_wc:
mock_fetch.return_value = {
'abc123': [{'CONTAINERID': 'abc123', 'WORKCENTERNAME': 'DB_1'}]
}
mock_get_wc.return_value = []
result = get_lot_history('abc123', workcenter_groups=['UNKNOWN'])
assert 'error' not in result
assert result['total'] == 1

245
tests/test_trace_routes.py Normal file
View File

@@ -0,0 +1,245 @@
# -*- coding: utf-8 -*-
"""Route tests for staged trace API endpoints."""
from __future__ import annotations
from unittest.mock import patch
import mes_dashboard.core.database as db
from mes_dashboard.app import create_app
from mes_dashboard.core.cache import NoOpCache
from mes_dashboard.core.rate_limit import reset_rate_limits_for_tests
def _client():
db._ENGINE = None
app = create_app('testing')
app.config['TESTING'] = True
app.extensions["cache"] = NoOpCache()
return app.test_client()
def setup_function():
reset_rate_limits_for_tests()
def teardown_function():
reset_rate_limits_for_tests()
@patch('mes_dashboard.routes.trace_routes.resolve_lots')
def test_seed_resolve_query_tool_success(mock_resolve_lots):
mock_resolve_lots.return_value = {
'data': [
{
'container_id': 'CID-001',
'lot_id': 'LOT-001',
}
]
}
client = _client()
response = client.post(
'/api/trace/seed-resolve',
json={
'profile': 'query_tool',
'params': {
'resolve_type': 'lot_id',
'values': ['LOT-001'],
},
},
)
assert response.status_code == 200
payload = response.get_json()
assert payload['stage'] == 'seed-resolve'
assert payload['seed_count'] == 1
assert payload['seeds'][0]['container_id'] == 'CID-001'
assert payload['seeds'][0]['container_name'] == 'LOT-001'
assert payload['cache_key'].startswith('trace:seed:query_tool:')
def test_seed_resolve_invalid_profile_returns_400():
client = _client()
response = client.post(
'/api/trace/seed-resolve',
json={
'profile': 'invalid',
'params': {},
},
)
assert response.status_code == 400
payload = response.get_json()
assert payload['error']['code'] == 'INVALID_PROFILE'
@patch('mes_dashboard.core.rate_limit.check_and_record', return_value=(True, 8))
def test_seed_resolve_rate_limited_returns_429(_mock_rate_limit):
client = _client()
response = client.post(
'/api/trace/seed-resolve',
json={
'profile': 'query_tool',
'params': {'resolve_type': 'lot_id', 'values': ['LOT-001']},
},
)
assert response.status_code == 429
assert response.headers.get('Retry-After') == '8'
payload = response.get_json()
assert payload['error']['code'] == 'TOO_MANY_REQUESTS'
@patch('mes_dashboard.routes.trace_routes.LineageEngine.resolve_full_genealogy')
def test_lineage_success_returns_snake_case(mock_resolve_genealogy):
mock_resolve_genealogy.return_value = {
'CID-001': {'CID-A', 'CID-B'}
}
client = _client()
response = client.post(
'/api/trace/lineage',
json={
'profile': 'query_tool',
'container_ids': ['CID-001'],
},
)
assert response.status_code == 200
payload = response.get_json()
assert payload['stage'] == 'lineage'
assert sorted(payload['ancestors']['CID-001']) == ['CID-A', 'CID-B']
assert payload['total_nodes'] == 3
assert 'totalNodes' not in payload
@patch(
'mes_dashboard.routes.trace_routes.LineageEngine.resolve_full_genealogy',
side_effect=TimeoutError('lineage timed out'),
)
def test_lineage_timeout_returns_504(_mock_resolve_genealogy):
client = _client()
response = client.post(
'/api/trace/lineage',
json={
'profile': 'query_tool',
'container_ids': ['CID-001'],
},
)
assert response.status_code == 504
payload = response.get_json()
assert payload['error']['code'] == 'LINEAGE_TIMEOUT'
@patch('mes_dashboard.core.rate_limit.check_and_record', return_value=(True, 6))
def test_lineage_rate_limited_returns_429(_mock_rate_limit):
client = _client()
response = client.post(
'/api/trace/lineage',
json={
'profile': 'query_tool',
'container_ids': ['CID-001'],
},
)
assert response.status_code == 429
assert response.headers.get('Retry-After') == '6'
payload = response.get_json()
assert payload['error']['code'] == 'TOO_MANY_REQUESTS'
@patch('mes_dashboard.routes.trace_routes.EventFetcher.fetch_events')
def test_events_partial_failure_returns_200_with_code(mock_fetch_events):
def _side_effect(_container_ids, domain):
if domain == 'history':
return {
'CID-001': [{'CONTAINERID': 'CID-001', 'EVENTTYPE': 'TRACK_IN'}]
}
raise RuntimeError('domain failed')
mock_fetch_events.side_effect = _side_effect
client = _client()
response = client.post(
'/api/trace/events',
json={
'profile': 'query_tool',
'container_ids': ['CID-001'],
'domains': ['history', 'materials'],
},
)
assert response.status_code == 200
payload = response.get_json()
assert payload['stage'] == 'events'
assert payload['code'] == 'EVENTS_PARTIAL_FAILURE'
assert 'materials' in payload['failed_domains']
assert payload['results']['history']['count'] == 1
@patch('mes_dashboard.routes.trace_routes.build_trace_aggregation_from_events')
@patch('mes_dashboard.routes.trace_routes.EventFetcher.fetch_events')
def test_events_mid_section_defect_with_aggregation(
mock_fetch_events,
mock_build_aggregation,
):
mock_fetch_events.return_value = {
'CID-001': [
{
'CONTAINERID': 'CID-001',
'WORKCENTER_GROUP': '測試',
'EQUIPMENTID': 'EQ-01',
'EQUIPMENTNAME': 'EQ-01',
}
]
}
mock_build_aggregation.return_value = {
'kpi': {'total_input': 100},
'charts': {'by_station': []},
'daily_trend': [],
'available_loss_reasons': [],
'genealogy_status': 'ready',
'detail_total_count': 0,
}
client = _client()
response = client.post(
'/api/trace/events',
json={
'profile': 'mid_section_defect',
'container_ids': ['CID-001'],
'domains': ['upstream_history'],
'params': {
'start_date': '2025-01-01',
'end_date': '2025-01-31',
},
'lineage': {'ancestors': {'CID-001': ['CID-A']}},
'seed_container_ids': ['CID-001'],
},
)
assert response.status_code == 200
payload = response.get_json()
assert payload['aggregation']['kpi']['total_input'] == 100
assert payload['aggregation']['genealogy_status'] == 'ready'
mock_build_aggregation.assert_called_once()
@patch('mes_dashboard.core.rate_limit.check_and_record', return_value=(True, 5))
def test_events_rate_limited_returns_429(_mock_rate_limit):
client = _client()
response = client.post(
'/api/trace/events',
json={
'profile': 'query_tool',
'container_ids': ['CID-001'],
'domains': ['history'],
},
)
assert response.status_code == 429
assert response.headers.get('Retry-After') == '5'
payload = response.get_json()
assert payload['error']['code'] == 'TOO_MANY_REQUESTS'