feat(query-tool): align lineage model and tighten timeline mapping

This commit is contained in:
egg
2026-02-22 17:36:47 +08:00
parent 6016c31e4d
commit 9890586191
29 changed files with 2090 additions and 299 deletions

View File

@@ -242,3 +242,70 @@ def test_split_ancestors_matches_legacy_bfs_for_five_known_lots(mock_read_sql_df
assert connect_by_result["child_to_parent"] == legacy_child_to_parent
assert connect_by_result["cid_to_name"] == legacy_cid_to_name
@patch("mes_dashboard.services.lineage_engine.LineageEngine._build_semantic_links")
@patch("mes_dashboard.services.lineage_engine.LineageEngine._resolve_container_snapshot")
@patch("mes_dashboard.services.lineage_engine.LineageEngine.resolve_merge_sources")
@patch("mes_dashboard.services.lineage_engine.LineageEngine.resolve_split_ancestors")
def test_resolve_full_genealogy_includes_semantic_edges(
mock_resolve_split_ancestors,
mock_resolve_merge_sources,
mock_resolve_container_snapshot,
mock_build_semantic_links,
):
mock_resolve_split_ancestors.return_value = {
"child_to_parent": {"GD-LOT": "SRC-LOT"},
"cid_to_name": {
"GD-LOT": "GD25060502-A11",
"SRC-LOT": "56014S00T-5K07R",
},
}
mock_resolve_merge_sources.return_value = {}
snapshots = {
"GD-LOT": {
"CONTAINERID": "GD-LOT",
"CONTAINERNAME": "GD25060502-A11",
"MFGORDERNAME": "GD25060502",
"OBJECTTYPE": "LOT",
"FIRSTNAME": "56014S00T-5K07R",
"ORIGINALCONTAINERID": "SRC-LOT",
"SPLITFROMID": "SRC-LOT",
},
"SRC-LOT": {
"CONTAINERID": "SRC-LOT",
"CONTAINERNAME": "56014S00T-5K07R",
"MFGORDERNAME": None,
"OBJECTTYPE": "LOT",
"FIRSTNAME": "56014S00T-5K07R",
"ORIGINALCONTAINERID": None,
"SPLITFROMID": None,
},
"WAFER-LOT": {
"CONTAINERID": "WAFER-LOT",
"CONTAINERNAME": "56014S00T-5K07R",
"MFGORDERNAME": None,
"OBJECTTYPE": "LOT",
"FIRSTNAME": "56014S00T-5K07R",
"ORIGINALCONTAINERID": None,
"SPLITFROMID": None,
},
}
mock_resolve_container_snapshot.return_value = snapshots
mock_build_semantic_links.return_value = (
snapshots,
[
("WAFER-LOT", "GD-LOT", "wafer_origin"),
("SRC-LOT", "GD-LOT", "gd_rework_source"),
],
{"WAFER-LOT"},
)
result = LineageEngine.resolve_full_genealogy(["GD-LOT"], {"GD-LOT": "GD25060502-A11"})
assert "GD-LOT" in result["parent_map"]
assert "SRC-LOT" in result["parent_map"]["GD-LOT"]
assert "WAFER-LOT" in result["parent_map"]["GD-LOT"]
edge_types = {edge["edge_type"] for edge in result["edges"]}
assert "wafer_origin" in edge_types
assert "gd_rework_source" in edge_types