Files
DashBoard/tests/test_lineage_engine.py
egg 5b358d71c1 feat(query-tool): rewrite frontend with ECharts tree, multi-select, and modular composables
Replace the monolithic useQueryToolData composable and nested Vue component tree
with a modular architecture: useLotResolve, useLotLineage, useLotDetail, and
useEquipmentQuery. Introduce ECharts TreeChart (LR orthogonal layout) for lot
lineage visualization with multi-select support, subtree expansion, zoom/pan,
and serial number normalization. Add unified LineageEngine backend with split
descendant traversal and leaf serial number queries. Archive the query-tool-rewrite
openspec change and sync delta specs to main.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-13 15:25:00 +08:00

245 lines
7.8 KiB
Python

# -*- coding: utf-8 -*-
"""Unit tests for LineageEngine."""
from __future__ import annotations
from unittest.mock import patch
import pandas as pd
from mes_dashboard.services.lineage_engine import LineageEngine
@patch("mes_dashboard.services.lineage_engine.read_sql_df")
def test_resolve_split_ancestors_batches_and_enforces_max_depth(mock_read_sql_df):
cids = [f"C{i:04d}" for i in range(1001)]
mock_read_sql_df.side_effect = [
pd.DataFrame(
[
{
"CONTAINERID": "C0000",
"SPLITFROMID": "P0000",
"CONTAINERNAME": "LOT-0000",
"SPLIT_DEPTH": 1,
},
{
"CONTAINERID": "P0000",
"SPLITFROMID": None,
"CONTAINERNAME": "LOT-P0000",
"SPLIT_DEPTH": 2,
},
]
),
pd.DataFrame(
[
{
"CONTAINERID": "C1000",
"SPLITFROMID": "P1000",
"CONTAINERNAME": "LOT-1000",
"SPLIT_DEPTH": 1,
},
{
"CONTAINERID": "C-TOO-DEEP",
"SPLITFROMID": "P-TOO-DEEP",
"CONTAINERNAME": "LOT-DEEP",
"SPLIT_DEPTH": 21,
},
]
),
]
result = LineageEngine.resolve_split_ancestors(cids, {"INIT": "LOT-INIT"})
assert mock_read_sql_df.call_count == 2
first_sql, first_params = mock_read_sql_df.call_args_list[0].args
second_sql, second_params = mock_read_sql_df.call_args_list[1].args
assert "LEVEL <= 20" in first_sql
assert "LEVEL <= 20" in second_sql
assert len(first_params) == 1000
assert len(second_params) == 1
assert result["child_to_parent"]["C0000"] == "P0000"
assert result["child_to_parent"]["C1000"] == "P1000"
assert "C-TOO-DEEP" not in result["child_to_parent"]
assert result["cid_to_name"]["C0000"] == "LOT-0000"
assert result["cid_to_name"]["INIT"] == "LOT-INIT"
@patch("mes_dashboard.services.lineage_engine.read_sql_df")
def test_resolve_merge_sources_batches_and_returns_mapping(mock_read_sql_df):
names = [f"FN{i:04d}" for i in range(1001)]
mock_read_sql_df.side_effect = [
pd.DataFrame(
[
{"FINISHEDNAME": "FN0000", "SOURCE_CID": "SRC-A"},
{"FINISHEDNAME": "FN0000", "SOURCE_CID": "SRC-B"},
]
),
pd.DataFrame(
[
{"FINISHEDNAME": "FN1000", "SOURCE_CID": "SRC-C"},
{"FINISHEDNAME": "FN1000", "SOURCE_CID": "SRC-C"},
{"FINISHEDNAME": None, "SOURCE_CID": "SRC-INVALID"},
]
),
]
result = LineageEngine.resolve_merge_sources(names)
assert mock_read_sql_df.call_count == 2
first_sql, first_params = mock_read_sql_df.call_args_list[0].args
second_sql, second_params = mock_read_sql_df.call_args_list[1].args
assert "{{ FINISHED_NAME_FILTER }}" not in first_sql
assert "{{ FINISHED_NAME_FILTER }}" not in second_sql
assert len(first_params) == 1000
assert len(second_params) == 1
assert result["FN0000"] == ["SRC-A", "SRC-B"]
assert result["FN1000"] == ["SRC-C"]
@patch("mes_dashboard.services.lineage_engine.LineageEngine.resolve_merge_sources")
@patch("mes_dashboard.services.lineage_engine.LineageEngine.resolve_split_ancestors")
def test_resolve_full_genealogy_combines_split_and_merge(
mock_resolve_split_ancestors,
mock_resolve_merge_sources,
):
mock_resolve_split_ancestors.side_effect = [
{
"child_to_parent": {
"A": "B",
"B": "C",
},
"cid_to_name": {
"A": "LOT-A",
"B": "LOT-B",
"C": "LOT-C",
},
},
{
"child_to_parent": {
"M1": "M0",
},
"cid_to_name": {
"M1": "LOT-M1",
"M0": "LOT-M0",
},
},
]
mock_resolve_merge_sources.return_value = {"LOT-B": ["M1"]}
result = LineageEngine.resolve_full_genealogy(["A"], {"A": "LOT-A"})
assert result["ancestors"] == {"A": {"B", "C", "M1", "M0"}}
assert result["cid_to_name"]["A"] == "LOT-A"
assert result["cid_to_name"]["M0"] == "LOT-M0"
# parent_map should have direct edges only
pm = result["parent_map"]
assert pm["A"] == ["B"]
assert pm["B"] == ["C", "M1"] or set(pm["B"]) == {"C", "M1"}
assert pm["M1"] == ["M0"]
# merge_edges: B → M1 (LOT-B matched merge source)
me = result["merge_edges"]
assert "M1" in me.get("B", [])
assert mock_resolve_split_ancestors.call_count == 2
mock_resolve_merge_sources.assert_called_once()
@patch("mes_dashboard.services.lineage_engine.read_sql_df")
def test_split_ancestors_matches_legacy_bfs_for_five_known_lots(mock_read_sql_df):
parent_by_cid = {
"L1": "L1P1",
"L1P1": "L1P2",
"L2": "L2P1",
"L3": None,
"L4": "L4P1",
"L4P1": "L4P2",
"L4P2": "L4P3",
"L5": "L5P1",
"L5P1": "L5P2",
"L5P2": "L5P1",
}
name_by_cid = {
"L1": "LOT-1",
"L1P1": "LOT-1-P1",
"L1P2": "LOT-1-P2",
"L2": "LOT-2",
"L2P1": "LOT-2-P1",
"L3": "LOT-3",
"L4": "LOT-4",
"L4P1": "LOT-4-P1",
"L4P2": "LOT-4-P2",
"L4P3": "LOT-4-P3",
"L5": "LOT-5",
"L5P1": "LOT-5-P1",
"L5P2": "LOT-5-P2",
}
seed_lots = ["L1", "L2", "L3", "L4", "L5"]
def _connect_by_rows(start_cids):
rows = []
for seed in start_cids:
current = seed
depth = 1
visited = set()
while current and depth <= 20 and current not in visited:
visited.add(current)
rows.append(
{
"CONTAINERID": current,
"SPLITFROMID": parent_by_cid.get(current),
"CONTAINERNAME": name_by_cid.get(current),
"SPLIT_DEPTH": depth,
}
)
current = parent_by_cid.get(current)
depth += 1
return pd.DataFrame(rows)
def _mock_read_sql(_sql, params):
requested = [value for value in params.values()]
return _connect_by_rows(requested)
mock_read_sql_df.side_effect = _mock_read_sql
connect_by_result = LineageEngine.resolve_split_ancestors(seed_lots)
# Legacy BFS reference implementation from previous mid_section_defect_service.
legacy_child_to_parent = {}
legacy_cid_to_name = {}
frontier = list(seed_lots)
seen = set(seed_lots)
rounds = 0
while frontier:
rounds += 1
batch_rows = []
for cid in frontier:
batch_rows.append(
{
"CONTAINERID": cid,
"SPLITFROMID": parent_by_cid.get(cid),
"CONTAINERNAME": name_by_cid.get(cid),
}
)
new_parents = set()
for row in batch_rows:
cid = row["CONTAINERID"]
split_from = row["SPLITFROMID"]
name = row["CONTAINERNAME"]
if isinstance(name, str) and name:
legacy_cid_to_name[cid] = name
if isinstance(split_from, str) and split_from and split_from != cid:
legacy_child_to_parent[cid] = split_from
if split_from not in seen:
seen.add(split_from)
new_parents.add(split_from)
frontier = list(new_parents)
if rounds > 20:
break
assert connect_by_result["child_to_parent"] == legacy_child_to_parent
assert connect_by_result["cid_to_name"] == legacy_cid_to_name