# -*- coding: utf-8 -*- """Unit tests for WIP service layer. Tests the WIP query functions that use DW_MES_LOT_V view. """ import unittest from unittest.mock import patch, MagicMock from functools import wraps import pandas as pd import threading import time from concurrent.futures import ThreadPoolExecutor from mes_dashboard.services.wip_service import ( WIP_VIEW, get_wip_summary, get_wip_matrix, get_wip_hold_summary, get_wip_detail, get_hold_detail_summary, get_hold_detail_lots, get_hold_overview_treemap, get_workcenters, get_packages, get_wip_filter_options, search_workorders, search_lot_ids, ) def disable_cache(func): """Decorator to disable Redis cache for Oracle fallback tests.""" @wraps(func) def wrapper(*args, **kwargs): import mes_dashboard.services.wip_service as wip_service with wip_service._wip_search_index_lock: wip_service._wip_search_index_cache.clear() with wip_service._wip_snapshot_lock: wip_service._wip_snapshot_cache.clear() with patch('mes_dashboard.services.wip_service.get_cached_wip_data', return_value=None): with patch('mes_dashboard.services.wip_service.get_cached_sys_date', return_value=None): return func(*args, **kwargs) return wrapper class TestWipServiceConfig(unittest.TestCase): """Test WIP service configuration.""" def test_wip_view_configured(self): """WIP_VIEW should be configured correctly.""" self.assertEqual(WIP_VIEW, "DWH.DW_MES_LOT_V") class TestGetWipSummary(unittest.TestCase): """Test get_wip_summary function.""" @disable_cache @patch('mes_dashboard.services.wip_service.read_sql_df') def test_returns_none_on_empty_result(self, mock_read_sql): """Should return None when query returns empty DataFrame.""" mock_read_sql.return_value = pd.DataFrame() result = get_wip_summary() self.assertIsNone(result) @disable_cache @patch('mes_dashboard.services.wip_service.read_sql_df') def test_returns_none_on_exception(self, mock_read_sql): """Should return None when query raises exception.""" mock_read_sql.side_effect = Exception("Database error") result = get_wip_summary() self.assertIsNone(result) class TestGetWipMatrix(unittest.TestCase): """Test get_wip_matrix function.""" @disable_cache @patch('mes_dashboard.services.wip_service.read_sql_df') def test_returns_matrix_structure(self, mock_read_sql): """Should return dict with matrix structure.""" mock_df = pd.DataFrame({ 'WORKCENTER_GROUP': ['切割', '切割', '焊接_DB'], 'WORKCENTERSEQUENCE_GROUP': [1, 1, 2], 'PACKAGE_LEF': ['SOT-23', 'SOD-323', 'SOT-23'], 'QTY': [50000000, 30000000, 40000000] }) mock_read_sql.return_value = mock_df result = get_wip_matrix() self.assertIsNotNone(result) self.assertIn('workcenters', result) self.assertIn('packages', result) self.assertIn('matrix', result) self.assertIn('workcenter_totals', result) self.assertIn('package_totals', result) self.assertIn('grand_total', result) @disable_cache @patch('mes_dashboard.services.wip_service.read_sql_df') def test_workcenters_sorted_by_sequence(self, mock_read_sql): """Workcenters should be sorted by WORKCENTERSEQUENCE_GROUP.""" mock_df = pd.DataFrame({ 'WORKCENTER_GROUP': ['焊接_DB', '切割'], 'WORKCENTERSEQUENCE_GROUP': [2, 1], 'PACKAGE_LEF': ['SOT-23', 'SOT-23'], 'QTY': [40000000, 50000000] }) mock_read_sql.return_value = mock_df result = get_wip_matrix() self.assertEqual(result['workcenters'], ['切割', '焊接_DB']) @disable_cache @patch('mes_dashboard.services.wip_service.read_sql_df') def test_packages_sorted_by_qty_desc(self, mock_read_sql): """Packages should be sorted by total QTY descending.""" mock_df = pd.DataFrame({ 'WORKCENTER_GROUP': ['切割', '切割'], 'WORKCENTERSEQUENCE_GROUP': [1, 1], 'PACKAGE_LEF': ['SOD-323', 'SOT-23'], 'QTY': [30000000, 50000000] }) mock_read_sql.return_value = mock_df result = get_wip_matrix() self.assertEqual(result['packages'][0], 'SOT-23') # Higher QTY first @disable_cache @patch('mes_dashboard.services.wip_service.read_sql_df') def test_returns_empty_structure_on_empty_result(self, mock_read_sql): """Should return empty structure when no data.""" mock_read_sql.return_value = pd.DataFrame() result = get_wip_matrix() self.assertIsNotNone(result) self.assertEqual(result['workcenters'], []) self.assertEqual(result['packages'], []) self.assertEqual(result['grand_total'], 0) @disable_cache @patch('mes_dashboard.services.wip_service.read_sql_df') def test_calculates_totals_correctly(self, mock_read_sql): """Should calculate workcenter and package totals correctly.""" mock_df = pd.DataFrame({ 'WORKCENTER_GROUP': ['切割', '切割'], 'WORKCENTERSEQUENCE_GROUP': [1, 1], 'PACKAGE_LEF': ['SOT-23', 'SOD-323'], 'QTY': [50000000, 30000000] }) mock_read_sql.return_value = mock_df result = get_wip_matrix() self.assertEqual(result['workcenter_totals']['切割'], 80000000) self.assertEqual(result['package_totals']['SOT-23'], 50000000) self.assertEqual(result['grand_total'], 80000000) class TestGetWipHoldSummary(unittest.TestCase): """Test get_wip_hold_summary function.""" @disable_cache @patch('mes_dashboard.services.wip_service.read_sql_df') def test_returns_hold_items(self, mock_read_sql): """Should return list of hold items.""" mock_df = pd.DataFrame({ 'REASON': ['YieldLimit', '特殊需求管控'], 'LOTS': [21, 44], 'QTY': [1084443, 4235060] }) mock_read_sql.return_value = mock_df result = get_wip_hold_summary() self.assertIsNotNone(result) self.assertIn('items', result) self.assertEqual(len(result['items']), 2) self.assertEqual(result['items'][0]['reason'], 'YieldLimit') self.assertEqual(result['items'][0]['lots'], 21) @disable_cache @patch('mes_dashboard.services.wip_service.read_sql_df') def test_returns_empty_items_on_no_holds(self, mock_read_sql): """Should return empty items list when no holds.""" mock_read_sql.return_value = pd.DataFrame() result = get_wip_hold_summary() self.assertIsNotNone(result) self.assertEqual(result['items'], []) class TestGetWipDetail(unittest.TestCase): """Test get_wip_detail function.""" @disable_cache @patch('mes_dashboard.services.wip_service.read_sql_df') def test_returns_none_on_empty_summary(self, mock_read_sql): """Should return None when summary query returns empty.""" mock_read_sql.return_value = pd.DataFrame() result = get_wip_detail('不存在的工站') self.assertIsNone(result) class TestGetWorkcenters(unittest.TestCase): """Test get_workcenters function.""" @disable_cache @patch('mes_dashboard.services.wip_service.read_sql_df') def test_returns_workcenter_list(self, mock_read_sql): """Should return list of workcenters with lot counts.""" mock_df = pd.DataFrame({ 'WORKCENTER_GROUP': ['切割', '焊接_DB'], 'WORKCENTERSEQUENCE_GROUP': [1, 2], 'LOT_COUNT': [1377, 859] }) mock_read_sql.return_value = mock_df result = get_workcenters() self.assertIsNotNone(result) self.assertEqual(len(result), 2) self.assertEqual(result[0]['name'], '切割') self.assertEqual(result[0]['lot_count'], 1377) @disable_cache @patch('mes_dashboard.services.wip_service.read_sql_df') def test_returns_empty_list_on_no_data(self, mock_read_sql): """Should return empty list when no workcenters.""" mock_read_sql.return_value = pd.DataFrame() result = get_workcenters() self.assertEqual(result, []) @disable_cache @patch('mes_dashboard.services.wip_service.read_sql_df') def test_returns_none_on_exception(self, mock_read_sql): """Should return None on exception.""" mock_read_sql.side_effect = Exception("Database error") result = get_workcenters() self.assertIsNone(result) class TestGetPackages(unittest.TestCase): """Test get_packages function.""" @disable_cache @patch('mes_dashboard.services.wip_service.read_sql_df') def test_returns_package_list(self, mock_read_sql): """Should return list of packages with lot counts.""" mock_df = pd.DataFrame({ 'PACKAGE_LEF': ['SOT-23', 'SOD-323'], 'LOT_COUNT': [2234, 1392] }) mock_read_sql.return_value = mock_df result = get_packages() self.assertIsNotNone(result) self.assertEqual(len(result), 2) self.assertEqual(result[0]['name'], 'SOT-23') self.assertEqual(result[0]['lot_count'], 2234) @disable_cache @patch('mes_dashboard.services.wip_service.read_sql_df') def test_returns_empty_list_on_no_data(self, mock_read_sql): """Should return empty list when no packages.""" mock_read_sql.return_value = pd.DataFrame() result = get_packages() self.assertEqual(result, []) class TestGetWipFilterOptions(unittest.TestCase): """Test get_wip_filter_options function.""" def setUp(self): import mes_dashboard.services.wip_service as wip_service with wip_service._wip_search_index_lock: wip_service._wip_search_index_cache.clear() with wip_service._wip_snapshot_lock: wip_service._wip_snapshot_cache.clear() @patch('mes_dashboard.services.wip_service._get_wip_search_index') def test_prefers_search_index_payload(self, mock_get_index): mock_get_index.return_value = { 'workorders': ['WO1', 'WO2'], 'lotids': ['LOT1'], 'packages': ['PKG1'], 'types': ['TYPE1'], 'firstnames': ['WF001'], 'waferdescs': ['SiC'], } result = get_wip_filter_options() self.assertEqual(result['workorders'], ['WO1', 'WO2']) self.assertEqual(result['firstnames'], ['WF001']) self.assertEqual(result['waferdescs'], ['SiC']) @patch('mes_dashboard.services.wip_service._get_wip_search_index', return_value=None) @patch('mes_dashboard.services.wip_service._get_wip_dataframe') def test_interdependent_options_follow_cross_filters(self, mock_cached_wip, _mock_get_index): mock_cached_wip.return_value = pd.DataFrame({ 'WORKORDER': ['WO1', 'WO1', 'WO2'], 'LOTID': ['L1', 'L2', 'L3'], 'PACKAGE_LEF': ['PKG-A', 'PKG-B', 'PKG-B'], 'PJ_TYPE': ['TYPE-1', 'TYPE-1', 'TYPE-2'], 'FIRSTNAME': ['WF-A', 'WF-B', 'WF-A'], 'WAFERDESC': ['SiC', 'SiC', 'Si'], 'EQUIPMENTCOUNT': [0, 1, 0], 'CURRENTHOLDCOUNT': [1, 0, 0], 'QTY': [10, 20, 30], 'HOLDREASONNAME': ['Q-Check', None, None], 'WORKCENTER_GROUP': ['WC-A', 'WC-A', 'WC-B'], }) result = get_wip_filter_options(workorder='WO1') # Exclude-self semantics: workorder options still show values allowed by other filters. self.assertEqual(result['workorders'], ['WO1', 'WO2']) self.assertEqual(result['lotids'], ['L1', 'L2']) self.assertEqual(result['types'], ['TYPE-1']) self.assertEqual(result['waferdescs'], ['SiC']) @patch('mes_dashboard.services.wip_service._get_wip_search_index', return_value=None) @patch('mes_dashboard.services.wip_service._select_with_snapshot_indexes') @patch('mes_dashboard.services.wip_service._get_wip_dataframe') def test_falls_back_to_cache_dataframe( self, mock_cached_wip, mock_select_with_snapshot, _mock_get_index, ): mock_cached_wip.return_value = pd.DataFrame({'WORKORDER': ['WO1']}) mock_select_with_snapshot.return_value = pd.DataFrame({ 'WORKORDER': ['WO2', 'WO1'], 'LOTID': ['LOT2', 'LOT1'], 'PACKAGE_LEF': ['PKG2', 'PKG1'], 'PJ_TYPE': ['TYPE2', 'TYPE1'], 'FIRSTNAME': ['WF002', 'WF001'], 'WAFERDESC': ['Si', 'SiC'], }) result = get_wip_filter_options() self.assertEqual(result['workorders'], ['WO1', 'WO2']) self.assertEqual(result['firstnames'], ['WF001', 'WF002']) self.assertEqual(result['waferdescs'], ['Si', 'SiC']) class TestSearchWorkorders(unittest.TestCase): """Test search_workorders function.""" @disable_cache @patch('mes_dashboard.services.wip_service.read_sql_df') def test_returns_matching_workorders(self, mock_read_sql): """Should return list of matching WORKORDER values.""" mock_df = pd.DataFrame({ 'WORKORDER': ['GA26012001', 'GA26012002', 'GA26012003'] }) mock_read_sql.return_value = mock_df result = search_workorders('GA26') self.assertIsNotNone(result) self.assertEqual(len(result), 3) self.assertEqual(result[0], 'GA26012001') @disable_cache @patch('mes_dashboard.services.wip_service.read_sql_df') def test_returns_empty_list_for_short_query(self, mock_read_sql): """Should return empty list for query < 2 characters.""" result = search_workorders('G') self.assertEqual(result, []) mock_read_sql.assert_not_called() @disable_cache @patch('mes_dashboard.services.wip_service.read_sql_df') def test_returns_empty_list_for_empty_query(self, mock_read_sql): """Should return empty list for empty query.""" result = search_workorders('') self.assertEqual(result, []) mock_read_sql.assert_not_called() @disable_cache @patch('mes_dashboard.services.wip_service.read_sql_df') def test_returns_empty_list_on_no_matches(self, mock_read_sql): """Should return empty list when no matches found.""" mock_read_sql.return_value = pd.DataFrame() result = search_workorders('NONEXISTENT') self.assertEqual(result, []) @disable_cache @patch('mes_dashboard.services.wip_service.read_sql_df') def test_respects_limit_parameter(self, mock_read_sql): """Should respect the limit parameter.""" mock_df = pd.DataFrame({ 'WORKORDER': ['GA26012001', 'GA26012002'] }) mock_read_sql.return_value = mock_df result = search_workorders('GA26', limit=2) self.assertEqual(len(result), 2) @disable_cache @patch('mes_dashboard.services.wip_service.read_sql_df') def test_caps_limit_at_50(self, mock_read_sql): """Should cap limit at 50.""" mock_df = pd.DataFrame({'WORKORDER': ['GA26012001']}) mock_read_sql.return_value = mock_df search_workorders('GA26', limit=100) # Verify params contain row_limit=50 (capped from 100) call_args = mock_read_sql.call_args params = call_args[0][1] if len(call_args[0]) > 1 else call_args[1].get('params', {}) self.assertEqual(params.get('row_limit'), 50) @disable_cache @patch('mes_dashboard.services.wip_service.read_sql_df') def test_returns_none_on_exception(self, mock_read_sql): """Should return None on exception.""" mock_read_sql.side_effect = Exception("Database error") result = search_workorders('GA26') self.assertIsNone(result) @disable_cache @patch('mes_dashboard.services.wip_service.read_sql_df') def test_excludes_dummy_by_default(self, mock_read_sql): """Should exclude DUMMY lots by default.""" mock_df = pd.DataFrame({'WORKORDER': []}) mock_read_sql.return_value = mock_df search_workorders('GA26') call_args = mock_read_sql.call_args[0][0] self.assertIn("LOTID NOT LIKE '%DUMMY%'", call_args) @disable_cache @patch('mes_dashboard.services.wip_service.read_sql_df') def test_includes_dummy_when_specified(self, mock_read_sql): """Should include DUMMY lots when include_dummy=True.""" mock_df = pd.DataFrame({'WORKORDER': []}) mock_read_sql.return_value = mock_df search_workorders('GA26', include_dummy=True) call_args = mock_read_sql.call_args[0][0] self.assertNotIn("LOTID NOT LIKE '%DUMMY%'", call_args) class TestSearchLotIds(unittest.TestCase): """Test search_lot_ids function.""" @disable_cache @patch('mes_dashboard.services.wip_service.read_sql_df') def test_returns_matching_lotids(self, mock_read_sql): """Should return list of matching LOTID values.""" mock_df = pd.DataFrame({ 'LOTID': ['GA26012345-A00-001', 'GA26012345-A00-002'] }) mock_read_sql.return_value = mock_df result = search_lot_ids('GA26012345') self.assertIsNotNone(result) self.assertEqual(len(result), 2) self.assertEqual(result[0], 'GA26012345-A00-001') @disable_cache @patch('mes_dashboard.services.wip_service.read_sql_df') def test_returns_empty_list_for_short_query(self, mock_read_sql): """Should return empty list for query < 2 characters.""" result = search_lot_ids('G') self.assertEqual(result, []) mock_read_sql.assert_not_called() @disable_cache @patch('mes_dashboard.services.wip_service.read_sql_df') def test_returns_empty_list_on_no_matches(self, mock_read_sql): """Should return empty list when no matches found.""" mock_read_sql.return_value = pd.DataFrame() result = search_lot_ids('NONEXISTENT') self.assertEqual(result, []) @disable_cache @patch('mes_dashboard.services.wip_service.read_sql_df') def test_returns_none_on_exception(self, mock_read_sql): """Should return None on exception.""" mock_read_sql.side_effect = Exception("Database error") result = search_lot_ids('GA26') self.assertIsNone(result) @disable_cache @patch('mes_dashboard.services.wip_service.read_sql_df') def test_excludes_dummy_by_default(self, mock_read_sql): """Should exclude DUMMY lots by default.""" mock_df = pd.DataFrame({'LOTID': []}) mock_read_sql.return_value = mock_df search_lot_ids('GA26') call_args = mock_read_sql.call_args[0][0] self.assertIn("LOTID NOT LIKE '%DUMMY%'", call_args) class TestWipSearchIndexShortcut(unittest.TestCase): """Test derived search index fast-path behavior.""" @patch('mes_dashboard.services.wip_service._search_workorders_from_oracle') @patch('mes_dashboard.services.wip_service._get_wip_search_index') def test_workorder_search_uses_index_without_cross_filters(self, mock_index, mock_oracle): mock_index.return_value = { "workorders": ["GA26012001", "GA26012002", "GB00000001"] } result = search_workorders("GA26", limit=10) self.assertEqual(result, ["GA26012001", "GA26012002"]) mock_oracle.assert_not_called() @patch('mes_dashboard.services.wip_service._search_workorders_from_oracle') @patch('mes_dashboard.services.wip_service._get_wip_search_index') def test_workorder_search_with_cross_filters_falls_back(self, mock_index, mock_oracle): mock_index.return_value = { "workorders": ["GA26012001", "GA26012002"] } mock_oracle.return_value = ["GA26012001"] result = search_workorders("GA26", package="SOT-23") self.assertEqual(result, ["GA26012001"]) mock_oracle.assert_called_once() class TestWipSnapshotLocking(unittest.TestCase): """Concurrency behavior for snapshot cache build path.""" def setUp(self): import mes_dashboard.services.wip_service as wip_service with wip_service._wip_snapshot_lock: wip_service._wip_snapshot_cache.clear() @staticmethod def _sample_df() -> pd.DataFrame: return pd.DataFrame({ "WORKORDER": ["WO1", "WO2"], "LOTID": ["LOT1", "LOT2"], "QTY": [100, 200], "EQUIPMENTCOUNT": [1, 0], "CURRENTHOLDCOUNT": [0, 1], "HOLDREASONNAME": [None, "品質確認"], "WORKCENTER_GROUP": ["WC-A", "WC-B"], "PACKAGE_LEF": ["PKG-A", "PKG-B"], "PJ_TYPE": ["T1", "T2"], }) def test_concurrent_snapshot_miss_builds_once(self): import mes_dashboard.services.wip_service as wip_service df = self._sample_df() build_count_lock = threading.Lock() build_count = 0 def slow_build(snapshot_df, include_dummy, version): nonlocal build_count with build_count_lock: build_count += 1 time.sleep(0.05) return { "version": version, "built_at": "2026-02-10T00:00:00", "row_count": int(len(snapshot_df)), "frame": snapshot_df, "indexes": {}, "frame_bytes": 0, "index_bucket_count": 0, } start_event = threading.Event() def call_snapshot(): start_event.wait(timeout=1) return wip_service._get_wip_snapshot(include_dummy=False) with patch.object(wip_service, "_get_wip_cache_version", return_value="version-1"): with patch.object(wip_service, "_get_wip_dataframe", return_value=df) as mock_get_df: with patch.object(wip_service, "_build_wip_snapshot", side_effect=slow_build): with ThreadPoolExecutor(max_workers=6) as pool: futures = [pool.submit(call_snapshot) for _ in range(6)] start_event.set() results = [future.result(timeout=3) for future in futures] self.assertEqual(build_count, 1) self.assertEqual(mock_get_df.call_count, 1) self.assertTrue(all(result is not None for result in results)) self.assertTrue(all(result.get("version") == "version-1" for result in results)) class TestDummyExclusionInAllFunctions(unittest.TestCase): """Test DUMMY exclusion is applied in all WIP functions.""" @disable_cache @patch('mes_dashboard.services.wip_service.read_sql_df') def test_get_wip_summary_excludes_dummy_by_default(self, mock_read_sql): """get_wip_summary should exclude DUMMY by default.""" mock_df = pd.DataFrame({ 'TOTAL_LOTS': [100], 'TOTAL_QTY_PCS': [1000], 'RUN_LOTS': [80], 'RUN_QTY_PCS': [800], 'QUEUE_LOTS': [10], 'QUEUE_QTY_PCS': [100], 'HOLD_LOTS': [10], 'HOLD_QTY_PCS': [100], 'DATA_UPDATE_DATE': ['2026-01-26'] }) mock_read_sql.return_value = mock_df get_wip_summary() call_args = mock_read_sql.call_args[0][0] self.assertIn("LOTID NOT LIKE '%DUMMY%'", call_args) @disable_cache @patch('mes_dashboard.services.wip_service.read_sql_df') def test_get_wip_summary_includes_dummy_when_specified(self, mock_read_sql): """get_wip_summary should include DUMMY when specified.""" mock_df = pd.DataFrame({ 'TOTAL_LOTS': [100], 'TOTAL_QTY_PCS': [1000], 'RUN_LOTS': [80], 'RUN_QTY_PCS': [800], 'QUEUE_LOTS': [10], 'QUEUE_QTY_PCS': [100], 'HOLD_LOTS': [10], 'HOLD_QTY_PCS': [100], 'DATA_UPDATE_DATE': ['2026-01-26'] }) mock_read_sql.return_value = mock_df get_wip_summary(include_dummy=True) call_args = mock_read_sql.call_args[0][0] self.assertNotIn("LOTID NOT LIKE '%DUMMY%'", call_args) @disable_cache @patch('mes_dashboard.services.wip_service.read_sql_df') def test_get_wip_matrix_excludes_dummy_by_default(self, mock_read_sql): """get_wip_matrix should exclude DUMMY by default.""" mock_df = pd.DataFrame({ 'WORKCENTER_GROUP': ['切割'], 'WORKCENTERSEQUENCE_GROUP': [1], 'PACKAGE_LEF': ['SOT-23'], 'QTY': [1000] }) mock_read_sql.return_value = mock_df get_wip_matrix() call_args = mock_read_sql.call_args[0][0] self.assertIn("LOTID NOT LIKE '%DUMMY%'", call_args) @disable_cache @patch('mes_dashboard.services.wip_service.read_sql_df') def test_get_wip_hold_summary_excludes_dummy_by_default(self, mock_read_sql): """get_wip_hold_summary should exclude DUMMY by default.""" mock_df = pd.DataFrame({ 'REASON': ['YieldLimit'], 'LOTS': [10], 'QTY': [1000] }) mock_read_sql.return_value = mock_df get_wip_hold_summary() call_args = mock_read_sql.call_args[0][0] self.assertIn("LOTID NOT LIKE '%DUMMY%'", call_args) @disable_cache @patch('mes_dashboard.services.wip_service.read_sql_df') def test_get_workcenters_excludes_dummy_by_default(self, mock_read_sql): """get_workcenters should exclude DUMMY by default.""" mock_df = pd.DataFrame({ 'WORKCENTER_GROUP': ['切割'], 'WORKCENTERSEQUENCE_GROUP': [1], 'LOT_COUNT': [100] }) mock_read_sql.return_value = mock_df get_workcenters() call_args = mock_read_sql.call_args[0][0] self.assertIn("LOTID NOT LIKE '%DUMMY%'", call_args) @disable_cache @patch('mes_dashboard.services.wip_service.read_sql_df') def test_get_packages_excludes_dummy_by_default(self, mock_read_sql): """get_packages should exclude DUMMY by default.""" mock_df = pd.DataFrame({ 'PACKAGE_LEF': ['SOT-23'], 'LOT_COUNT': [100] }) mock_read_sql.return_value = mock_df get_packages() call_args = mock_read_sql.call_args[0][0] self.assertIn("LOTID NOT LIKE '%DUMMY%'", call_args) class TestMultipleFilterConditions(unittest.TestCase): """Test multiple filter conditions work together.""" @disable_cache @patch('mes_dashboard.services.wip_service.read_sql_df') def test_get_wip_summary_with_all_filters(self, mock_read_sql): """get_wip_summary should combine all filter conditions via parameterized queries.""" mock_df = pd.DataFrame({ 'TOTAL_LOTS': [50], 'TOTAL_QTY_PCS': [500], 'RUN_LOTS': [40], 'RUN_QTY_PCS': [400], 'QUEUE_LOTS': [5], 'QUEUE_QTY_PCS': [50], 'HOLD_LOTS': [5], 'HOLD_QTY_PCS': [50], 'QUALITY_HOLD_LOTS': [3], 'QUALITY_HOLD_QTY_PCS': [30], 'NON_QUALITY_HOLD_LOTS': [2], 'NON_QUALITY_HOLD_QTY_PCS': [20], 'DATA_UPDATE_DATE': ['2026-01-26'] }) mock_read_sql.return_value = mock_df get_wip_summary(workorder='GA26', lotid='A00') # Check SQL contains parameterized LIKE conditions call_args = mock_read_sql.call_args sql = call_args[0][0] params = call_args[0][1] if len(call_args[0]) > 1 else {} self.assertIn("WORKORDER", sql) self.assertIn("LOTID", sql) self.assertIn("LIKE", sql) self.assertIn("LOTID NOT LIKE '%DUMMY%'", sql) # Verify params contain the search patterns self.assertTrue(any('%GA26%' in str(v) for v in params.values())) self.assertTrue(any('%A00%' in str(v) for v in params.values())) @disable_cache @patch('mes_dashboard.services.wip_service.read_sql_df') def test_get_wip_matrix_with_all_filters(self, mock_read_sql): """get_wip_matrix should combine all filter conditions via parameterized queries.""" mock_df = pd.DataFrame({ 'WORKCENTER_GROUP': ['切割'], 'WORKCENTERSEQUENCE_GROUP': [1], 'PACKAGE_LEF': ['SOT-23'], 'QTY': [500] }) mock_read_sql.return_value = mock_df get_wip_matrix(workorder='GA26', lotid='A00', include_dummy=True) # Check SQL contains parameterized LIKE conditions call_args = mock_read_sql.call_args sql = call_args[0][0] params = call_args[0][1] if len(call_args[0]) > 1 else {} self.assertIn("WORKORDER", sql) self.assertIn("LOTID", sql) self.assertIn("LIKE", sql) # Should NOT contain DUMMY exclusion since include_dummy=True self.assertNotIn("LOTID NOT LIKE '%DUMMY%'", sql) # Verify params contain the search patterns self.assertTrue(any('%GA26%' in str(v) for v in params.values())) self.assertTrue(any('%A00%' in str(v) for v in params.values())) class TestHoldOverviewServiceCachePath(unittest.TestCase): """Test hold overview related behavior on cache path.""" def setUp(self): import mes_dashboard.services.wip_service as wip_service with wip_service._wip_search_index_lock: wip_service._wip_search_index_cache.clear() with wip_service._wip_snapshot_lock: wip_service._wip_snapshot_cache.clear() @staticmethod def _sample_hold_df() -> pd.DataFrame: return pd.DataFrame({ 'LOTID': ['L1', 'L2', 'L3', 'L4', 'L5'], 'WORKORDER': ['WO1', 'WO2', 'WO3', 'WO4', 'WO5'], 'QTY': [100, 50, 80, 60, 20], 'PACKAGE_LEF': ['PKG-A', 'PKG-B', 'PKG-A', 'PKG-Z', 'PKG-C'], 'WORKCENTER_GROUP': ['WC-A', 'WC-B', 'WC-A', 'WC-Z', 'WC-C'], 'WORKCENTERSEQUENCE_GROUP': [1, 2, 1, 9, 3], 'HOLDREASONNAME': ['品質確認', '特殊需求管控', '品質確認', None, '設備異常'], 'AGEBYDAYS': [2.0, 3.0, 5.0, 0.3, 1.2], 'EQUIPMENTCOUNT': [0, 0, 0, 1, 0], 'CURRENTHOLDCOUNT': [1, 1, 1, 0, 1], 'SPECNAME': ['S1', 'S2', 'S1', 'S9', 'S3'], 'HOLDEMP': ['EMP1', 'EMP2', 'EMP3', 'EMP4', 'EMP5'], 'DEPTNAME': ['QC', 'PD', 'QC', 'RUN', 'QC'], 'COMMENT_HOLD': ['C1', 'C2', 'C3', 'C4', 'C5'], 'COMMENT_FUTURE': ['FC1', None, 'FC3', None, 'FC5'], 'PRODUCT': ['PROD-A', 'PROD-B', 'PROD-A', 'PROD-Z', 'PROD-C'], 'PJ_TYPE': ['T1', 'T2', 'T1', 'T9', 'T3'], }) @patch('mes_dashboard.services.wip_service.get_cached_sys_date', return_value='2026-02-10 10:00:00') @patch('mes_dashboard.services.wip_service.get_cached_wip_data') def test_get_hold_detail_summary_supports_optional_reason_and_hold_type( self, mock_cached_wip, _mock_sys_date, ): mock_cached_wip.return_value = self._sample_hold_df() reason_summary = get_hold_detail_summary(reason='品質確認') self.assertEqual(reason_summary['totalLots'], 2) self.assertEqual(reason_summary['totalQty'], 180) self.assertEqual(reason_summary['workcenterCount'], 1) self.assertEqual(reason_summary['dataUpdateDate'], '2026-02-10 10:00:00') quality_summary = get_hold_detail_summary(hold_type='quality') self.assertEqual(quality_summary['totalLots'], 3) self.assertEqual(quality_summary['totalQty'], 200) self.assertEqual(quality_summary['workcenterCount'], 2) all_hold_summary = get_hold_detail_summary() self.assertEqual(all_hold_summary['totalLots'], 4) self.assertEqual(all_hold_summary['totalQty'], 250) self.assertEqual(all_hold_summary['workcenterCount'], 3) @patch('mes_dashboard.services.wip_service.get_cached_wip_data') def test_get_hold_detail_lots_returns_hold_reason_and_treemap_filter(self, mock_cached_wip): mock_cached_wip.return_value = self._sample_hold_df() reason_result = get_hold_detail_lots(reason='品質確認', page=1, page_size=10) self.assertEqual(len(reason_result['lots']), 2) self.assertEqual(reason_result['lots'][0]['lotId'], 'L3') self.assertEqual(reason_result['lots'][0]['holdReason'], '品質確認') treemap_result = get_hold_detail_lots( reason=None, hold_type=None, treemap_reason='特殊需求管控', page=1, page_size=10, ) self.assertEqual(len(treemap_result['lots']), 1) self.assertEqual(treemap_result['lots'][0]['lotId'], 'L2') self.assertEqual(treemap_result['lots'][0]['holdReason'], '特殊需求管控') @patch('mes_dashboard.services.wip_service.get_cached_wip_data') def test_get_hold_detail_lots_includes_product_and_future_hold_comment(self, mock_cached_wip): mock_cached_wip.return_value = self._sample_hold_df() result = get_hold_detail_lots(reason='品質確認', page=1, page_size=10) lot = result['lots'][0] self.assertEqual(lot['product'], 'PROD-A') self.assertEqual(lot['futureHoldComment'], 'FC3') lot2 = result['lots'][1] self.assertEqual(lot2['product'], 'PROD-A') self.assertEqual(lot2['futureHoldComment'], 'FC1') @patch('mes_dashboard.services.wip_service.get_cached_wip_data') def test_get_wip_matrix_reason_filter_keeps_backward_compatibility(self, mock_cached_wip): mock_cached_wip.return_value = self._sample_hold_df() hold_quality_all = get_wip_matrix(status='HOLD', hold_type='quality') self.assertEqual(hold_quality_all['grand_total'], 200) hold_quality_reason = get_wip_matrix( status='HOLD', hold_type='quality', reason='品質確認', ) self.assertEqual(hold_quality_reason['grand_total'], 180) self.assertEqual(hold_quality_reason['workcenters'], ['WC-A']) @patch('mes_dashboard.services.wip_service.get_cached_wip_data') def test_get_hold_overview_treemap_groups_by_workcenter_and_reason(self, mock_cached_wip): mock_cached_wip.return_value = self._sample_hold_df() result = get_hold_overview_treemap(hold_type='quality') self.assertIsNotNone(result) items = result['items'] self.assertEqual(len(items), 2) expected = {(item['workcenter'], item['reason']): item for item in items} self.assertEqual(expected[('WC-A', '品質確認')]['lots'], 2) self.assertEqual(expected[('WC-A', '品質確認')]['qty'], 180) self.assertAlmostEqual(expected[('WC-A', '品質確認')]['avgAge'], 3.5) self.assertEqual(expected[('WC-C', '設備異常')]['lots'], 1) class TestHoldOverviewServiceOracleFallback(unittest.TestCase): """Test reason filtering behavior on Oracle fallback path.""" @disable_cache @patch('mes_dashboard.services.wip_service.read_sql_df') def test_get_wip_matrix_oracle_applies_reason_for_hold_status(self, mock_read_sql): mock_read_sql.return_value = pd.DataFrame() get_wip_matrix(status='HOLD', reason='品質確認') call_args = mock_read_sql.call_args sql = call_args[0][0] params = call_args[0][1] if len(call_args[0]) > 1 else {} self.assertIn('HOLDREASONNAME', sql) self.assertTrue(any(v == '品質確認' for v in params.values())) @disable_cache @patch('mes_dashboard.services.wip_service.read_sql_df') def test_get_wip_matrix_oracle_ignores_reason_for_non_hold_status(self, mock_read_sql): mock_read_sql.return_value = pd.DataFrame() get_wip_matrix(status='RUN', reason='品質確認') call_args = mock_read_sql.call_args sql = call_args[0][0] self.assertNotIn('HOLDREASONNAME', sql) import pytest class TestWipServiceIntegration: """Integration tests that hit the actual database. These tests are skipped by default. Run with: python -m pytest tests/test_wip_service.py -k Integration --run-integration """ @pytest.fixture(autouse=True) def _reset_db_state(self): """Reset DB engine and circuit breaker so integration tests start clean.""" import mes_dashboard.core.database as _db from mes_dashboard.core.circuit_breaker import get_database_circuit_breaker _db._ENGINE = None get_database_circuit_breaker().reset() @pytest.mark.integration def test_get_wip_summary_integration(self): """Integration test for get_wip_summary.""" result = get_wip_summary() assert result is not None assert result['totalLots'] > 0 assert 'dataUpdateDate' in result @pytest.mark.integration def test_get_wip_matrix_integration(self): """Integration test for get_wip_matrix.""" result = get_wip_matrix() assert result is not None assert len(result['workcenters']) > 0 assert result['grand_total'] > 0 @pytest.mark.integration def test_get_wip_hold_summary_integration(self): """Integration test for get_wip_hold_summary.""" result = get_wip_hold_summary() assert result is not None assert 'items' in result @pytest.mark.integration def test_get_wip_detail_integration(self): """Integration test for get_wip_detail.""" # First get a valid workcenter workcenters = get_workcenters() assert workcenters is not None and len(workcenters) > 0 wc_name = workcenters[0]['name'] result = get_wip_detail(wc_name, page=1, page_size=10) assert result is not None assert result['workcenter'] == wc_name assert 'summary' in result assert 'lots' in result assert 'pagination' in result @pytest.mark.integration def test_get_workcenters_integration(self): """Integration test for get_workcenters.""" result = get_workcenters() assert result is not None assert len(result) > 0 assert 'name' in result[0] assert 'lot_count' in result[0] @pytest.mark.integration def test_get_packages_integration(self): """Integration test for get_packages.""" result = get_packages() assert result is not None assert len(result) > 0 assert 'name' in result[0] assert 'lot_count' in result[0] @pytest.mark.integration def test_search_workorders_integration(self): """Integration test for search_workorders.""" # Use a common prefix that likely exists result = search_workorders('GA') assert result is not None # Should return a list (possibly empty if no GA* workorders) assert isinstance(result, list) @pytest.mark.integration def test_search_lot_ids_integration(self): """Integration test for search_lot_ids.""" # Use a common prefix that likely exists result = search_lot_ids('GA') assert result is not None assert isinstance(result, list) @pytest.mark.integration def test_dummy_exclusion_integration(self): """Integration test to verify DUMMY exclusion works.""" # Get summary with and without DUMMY result_without_dummy = get_wip_summary(include_dummy=False) result_with_dummy = get_wip_summary(include_dummy=True) assert result_without_dummy is not None assert result_with_dummy is not None # If there are DUMMY lots, with_dummy should have more # (or equal if no DUMMY lots exist) assert result_with_dummy['totalLots'] >= result_without_dummy['totalLots'] @pytest.mark.integration def test_workorder_filter_integration(self): """Integration test for workorder filter.""" # Get all data first all_result = get_wip_summary() assert all_result is not None # Search for a workorder that exists workorders = search_workorders('GA', limit=1) if workorders and len(workorders) > 0: # Filter by that workorder filtered_result = get_wip_summary(workorder=workorders[0]) assert filtered_result is not None # Filtered count should be less than or equal to total assert filtered_result['totalLots'] <= all_result['totalLots'] if __name__ == "__main__": unittest.main()