diff --git a/tests/conftest.py b/tests/conftest.py index 6d9dc18..549a2ca 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -20,6 +20,32 @@ os.environ.setdefault('WATCHDOG_RESTART_FLAG', os.path.join(_TMP_DIR, 'mes_dashb os.environ.setdefault('WATCHDOG_PID_FILE', os.path.join(_TMP_DIR, 'gunicorn.pid')) os.environ.setdefault('WATCHDOG_STATE_FILE', os.path.join(_TMP_DIR, 'mes_dashboard_restart_state.json')) + +def _load_dotenv_if_present(): + """Load .env file into os.environ for integration/e2e tests. + + Uses setdefault so explicit env vars (e.g. FLASK_ENV=testing set above) + take precedence over .env values. + """ + env_path = os.path.join(_PROJECT_ROOT, '.env') + if not os.path.isfile(env_path): + return + with open(env_path) as f: + for line in f: + line = line.strip() + if not line or line.startswith('#') or '=' not in line: + continue + key, value = line.split('=', 1) + os.environ.setdefault(key.strip(), value.strip()) + + +# When --run-integration or --run-e2e is passed, load .env BEFORE database module +# import so DB_HOST/DB_USER etc. are available for CONNECTION_STRING construction. +# The database config module explicitly skips .env under pytest (to isolate unit tests), +# so we pre-populate the env vars here for integration/e2e runs. +if '--run-integration' in sys.argv or '--run-e2e' in sys.argv: + _load_dotenv_if_present() + import mes_dashboard.core.database as db from mes_dashboard.app import create_app from mes_dashboard.core.modernization_policy import clear_modernization_policy_cache diff --git a/tests/e2e/test_query_tool_e2e.py b/tests/e2e/test_query_tool_e2e.py index aba32c5..616f6c5 100644 --- a/tests/e2e/test_query_tool_e2e.py +++ b/tests/e2e/test_query_tool_e2e.py @@ -241,7 +241,7 @@ class TestQueryToolBackendIntegration: }) assert lineage_resp.status_code == 200 payload = lineage_resp.json() - assert "ancestors" in payload or "data" in payload + assert "children_map" in payload or "ancestors" in payload or "data" in payload # --------------------------------------------------------------------------- @@ -276,12 +276,13 @@ class TestQueryToolPageE2E: page.goto(f"{app_server}{QUERY_TOOL_BASE}?tab=lot", wait_until="commit", timeout=60000) page.wait_for_timeout(2000) - # Select work_order input type - select = page.locator("select") + # Select work_order input type (use .first – the Lot tab's QueryBar + # is rendered before the Reverse tab's QueryBar via v-show) + select = page.locator("select.query-tool-select").first select.select_option("work_order") # Enter work order in textarea - textarea = page.locator("textarea") + textarea = page.locator("textarea.query-tool-textarea").first textarea.fill("GA26010001") # Collect API responses during resolve @@ -295,8 +296,8 @@ class TestQueryToolPageE2E: page.on("response", handle_response) - # Click resolve button - resolve_btn = page.locator("button", has_text="解析") + # Click resolve button (use .first – Lot tab's button appears first via v-show) + resolve_btn = page.locator("button", has_text="解析").first resolve_btn.click() # Wait for resolve + lineage responses @@ -340,10 +341,10 @@ class TestQueryToolPageE2E: page.goto(f"{app_server}{QUERY_TOOL_BASE}?tab=lot", wait_until="commit", timeout=60000) page.wait_for_timeout(3000) - # Select work_order and resolve - page.locator("select").select_option("work_order") - page.locator("textarea").fill("GA26010001") - page.locator("button", has_text="解析").click() + # Select work_order and resolve (use .first to target Lot tab's QueryBar) + page.locator("select.query-tool-select").first.select_option("work_order") + page.locator("textarea.query-tool-textarea").first.fill("GA26010001") + page.locator("button", has_text="解析").first.click() # Wait for resolve + lineage + detail loading resolve_done = _wait_for_api_response(page, "/api/query-tool/resolve", timeout_seconds=60) @@ -385,8 +386,8 @@ class TestQueryToolPageE2E: page.goto(f"{app_server}{QUERY_TOOL_BASE}?tab=lot", wait_until="commit", timeout=60000) page.wait_for_timeout(1500) - # Enter text in LOT tab - textarea = page.locator("textarea") + # Enter text in LOT tab (use .first to target Lot tab's QueryBar) + textarea = page.locator("textarea.query-tool-textarea").first textarea.fill("GA26010001") # Switch to equipment tab @@ -408,9 +409,9 @@ class TestQueryToolPageE2E: page.goto(f"{app_server}{QUERY_TOOL_BASE}?tab=lot", wait_until="commit", timeout=60000) page.wait_for_timeout(1500) - page.locator("select").select_option("work_order") - page.locator("textarea").fill("GA26010001") - page.locator("button", has_text="解析").click() + page.locator("select.query-tool-select").first.select_option("work_order") + page.locator("textarea.query-tool-textarea").first.fill("GA26010001") + page.locator("button", has_text="解析").first.click() # Wait for resolve + lineage page.wait_for_timeout(8000) @@ -433,9 +434,9 @@ class TestQueryToolPageE2E: page.goto(f"{app_server}{QUERY_TOOL_BASE}?tab=lot", wait_until="commit", timeout=60000) page.wait_for_timeout(1500) - page.locator("select").select_option("work_order") - page.locator("textarea").fill("GA26010001") - page.locator("button", has_text="解析").click() + page.locator("select.query-tool-select").first.select_option("work_order") + page.locator("textarea.query-tool-textarea").first.fill("GA26010001") + page.locator("button", has_text="解析").first.click() # Wait for resolve + detail load page.wait_for_timeout(8000) @@ -467,9 +468,9 @@ class TestQueryToolFullFlowE2E: page.goto(f"{app_server}{QUERY_TOOL_BASE}?tab=lot", wait_until="commit", timeout=60000) page.wait_for_timeout(2000) - # Step 1: Configure input - page.locator("select").select_option("work_order") - page.locator("textarea").fill("GA26010001") + # Step 1: Configure input (use .first to target Lot tab's QueryBar) + page.locator("select.query-tool-select").first.select_option("work_order") + page.locator("textarea.query-tool-textarea").first.fill("GA26010001") # Step 2: Track all API calls api_calls = {} @@ -489,8 +490,8 @@ class TestQueryToolFullFlowE2E: page.on("response", track_response) - # Step 3: Click resolve - page.locator("button", has_text="解析").click() + # Step 3: Click resolve (use .first – Lot tab's button appears first via v-show) + page.locator("button", has_text="解析").first.click() # Step 4: Wait for cascade of API calls deadline = time.time() + 90 diff --git a/tests/e2e/test_resource_history_e2e.py b/tests/e2e/test_resource_history_e2e.py index 26f359c..7e47aeb 100644 --- a/tests/e2e/test_resource_history_e2e.py +++ b/tests/e2e/test_resource_history_e2e.py @@ -1,39 +1,39 @@ -# -*- coding: utf-8 -*- -"""End-to-end tests for resource history analysis page. - -These tests simulate real user workflows through the resource history analysis feature. -Run with: pytest tests/e2e/test_resource_history_e2e.py -v --run-integration -""" - +# -*- coding: utf-8 -*- +"""End-to-end tests for resource history analysis page. + +These tests simulate real user workflows through the resource history analysis feature. +Run with: pytest tests/e2e/test_resource_history_e2e.py -v --run-integration +""" + import json import pytest from unittest.mock import patch import pandas as pd from datetime import datetime - -import sys -import os -sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', 'src')) - -import mes_dashboard.core.database as db -from mes_dashboard.app import create_app - - -@pytest.fixture -def app(): - """Create application for testing.""" - db._ENGINE = None - app = create_app('testing') - app.config['TESTING'] = True - return app - - -@pytest.fixture -def client(app): - """Create test client.""" - return app.test_client() - - + +import sys +import os +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', 'src')) + +import mes_dashboard.core.database as db +from mes_dashboard.app import create_app + + +@pytest.fixture +def app(): + """Create application for testing.""" + db._ENGINE = None + app = create_app('testing') + app.config['TESTING'] = True + return app + + +@pytest.fixture +def client(app): + """Create test client.""" + return app.test_client() + + class TestResourceHistoryPageAccess: """E2E tests for page access and navigation.""" @@ -76,8 +76,8 @@ class TestResourceHistoryPageAccess: else: assert '/static/dist/resource-history.js' in content assert 'type="module"' in content - - + + class TestResourceHistoryAPIWorkflow: """E2E tests for API workflows.""" @@ -92,20 +92,23 @@ class TestResourceHistoryAPIWorkflow: ], 'families': ['FAM001', 'FAM002'], } - - response = client.get('/api/resource/history/options') - - assert response.status_code == 200 - data = json.loads(response.data) - assert data['success'] is True - assert 'workcenter_groups' in data['data'] - assert 'families' in data['data'] - - @patch('mes_dashboard.services.resource_history_service._get_filtered_resources') - @patch('mes_dashboard.services.resource_history_service.read_sql_df') - def test_complete_query_workflow(self, mock_read_sql, mock_resources, client): - """Complete query workflow should return all data sections.""" - mock_resources.return_value = [ + + response = client.get('/api/resource/history/options') + + assert response.status_code == 200 + data = json.loads(response.data) + assert data['success'] is True + assert 'workcenter_groups' in data['data'] + assert 'families' in data['data'] + + @patch('mes_dashboard.services.resource_dataset_cache._get_workcenter_mapping') + @patch('mes_dashboard.services.resource_dataset_cache._get_resource_lookup') + @patch('mes_dashboard.services.resource_dataset_cache.read_sql_df') + @patch('mes_dashboard.services.resource_dataset_cache._get_filtered_resources_and_lookup') + def test_complete_query_workflow(self, mock_res_lookup, mock_read_sql, + mock_get_lookup, mock_get_wc, client): + """Complete query workflow via POST /query should return summary + detail.""" + resources = [ { 'RESOURCEID': 'RES001', 'WORKCENTERNAME': '焊接_DB', @@ -119,73 +122,73 @@ class TestResourceHistoryAPIWorkflow: 'RESOURCENAME': 'RES002', }, ] + resource_lookup = {r['RESOURCEID']: r for r in resources} + mock_res_lookup.return_value = ( + resources, + resource_lookup, + "HISTORYID IN ('RES001', 'RES002')", + ) + mock_get_lookup.return_value = resource_lookup + mock_get_wc.return_value = { + '焊接_DB': {'group': '焊接_DB', 'sequence': 1}, + '成型': {'group': '成型', 'sequence': 4}, + } - # Mock responses for the 3 queries in query_summary - kpi_df = pd.DataFrame([{ - 'PRD_HOURS': 8000, 'SBY_HOURS': 1000, 'UDT_HOURS': 500, - 'SDT_HOURS': 300, 'EGT_HOURS': 200, 'NST_HOURS': 1000, - 'MACHINE_COUNT': 100 - }]) - - trend_df = pd.DataFrame([ - {'DATA_DATE': datetime(2024, 1, 1), 'PRD_HOURS': 1000, 'SBY_HOURS': 100, - 'UDT_HOURS': 50, 'SDT_HOURS': 30, 'EGT_HOURS': 20, 'NST_HOURS': 100, 'MACHINE_COUNT': 100}, - {'DATA_DATE': datetime(2024, 1, 2), 'PRD_HOURS': 1100, 'SBY_HOURS': 90, - 'UDT_HOURS': 40, 'SDT_HOURS': 25, 'EGT_HOURS': 15, 'NST_HOURS': 100, 'MACHINE_COUNT': 100}, - ]) - - heatmap_raw_df = pd.DataFrame([ + # Base facts DataFrame (per-resource × per-day, single Oracle query) + base_df = pd.DataFrame([ {'HISTORYID': 'RES001', 'DATA_DATE': datetime(2024, 1, 1), - 'PRD_HOURS': 400, 'SBY_HOURS': 50, 'UDT_HOURS': 25, 'SDT_HOURS': 15, 'EGT_HOURS': 10, 'NST_HOURS': 20}, + 'PRD_HOURS': 4000, 'SBY_HOURS': 500, 'UDT_HOURS': 250, + 'SDT_HOURS': 150, 'EGT_HOURS': 100, 'NST_HOURS': 500, 'TOTAL_HOURS': 5500}, {'HISTORYID': 'RES002', 'DATA_DATE': datetime(2024, 1, 1), - 'PRD_HOURS': 600, 'SBY_HOURS': 50, 'UDT_HOURS': 25, 'SDT_HOURS': 15, 'EGT_HOURS': 10, 'NST_HOURS': 30}, + 'PRD_HOURS': 4000, 'SBY_HOURS': 500, 'UDT_HOURS': 250, + 'SDT_HOURS': 150, 'EGT_HOURS': 100, 'NST_HOURS': 500, 'TOTAL_HOURS': 5500}, ]) + mock_read_sql.return_value = base_df - # Use function-based side_effect for ThreadPoolExecutor parallel queries - def mock_sql(sql, _params=None): - sql_upper = sql.upper() - if 'HISTORYID' in sql_upper and 'DATA_DATE' in sql_upper: - return heatmap_raw_df - elif 'DATA_DATE' in sql_upper: - return trend_df - else: - return kpi_df + response = client.post( + '/api/resource/history/query', + json={ + 'start_date': '2024-01-01', + 'end_date': '2024-01-07', + 'granularity': 'day', + }, + ) + + assert response.status_code == 200 + data = json.loads(response.data) + assert data['success'] is True + assert 'query_id' in data + + # Verify KPI (derived from base_df) + # Total PRD=8000, SBY=1000, UDT=500, SDT=300, EGT=200 + # OU% = 8000/(8000+1000+500+300+200)*100 = 80.0 + assert data['summary']['kpi']['ou_pct'] == 80.0 + # Availability% = (8000+1000+200)/(8000+1000+200+300+500+1000)*100 = 83.6 + assert data['summary']['kpi']['availability_pct'] == 83.6 + assert data['summary']['kpi']['machine_count'] == 2 + + # Verify trend (one period since both rows are same date) + assert len(data['summary']['trend']) >= 1 + assert 'availability_pct' in data['summary']['trend'][0] - mock_read_sql.side_effect = mock_sql - - response = client.get( - '/api/resource/history/summary' - '?start_date=2024-01-01' - '&end_date=2024-01-07' - '&granularity=day' - ) - - assert response.status_code == 200 - data = json.loads(response.data) - assert data['success'] is True - - # Verify KPI - assert data['data']['kpi']['ou_pct'] == 80.0 - # Availability% = (8000+1000+200) / (8000+1000+200+300+500+1000) * 100 = 9200/11000 = 83.6% - assert data['data']['kpi']['availability_pct'] == 83.6 - assert data['data']['kpi']['machine_count'] == 100 - - # Verify trend - assert len(data['data']['trend']) == 2 - # Trend should also have availability_pct - assert 'availability_pct' in data['data']['trend'][0] - # Verify heatmap - assert len(data['data']['heatmap']) == 2 + assert len(data['summary']['heatmap']) >= 1 # Verify comparison - assert len(data['data']['workcenter_comparison']) == 2 + assert len(data['summary']['workcenter_comparison']) == 2 - @patch('mes_dashboard.services.resource_history_service._get_filtered_resources') - @patch('mes_dashboard.services.resource_history_service.read_sql_df') - def test_detail_query_workflow(self, mock_read_sql, mock_resources, client): - """Detail query workflow should return hierarchical data.""" - mock_resources.return_value = [ + # Verify detail + assert data['detail']['total'] == 2 + assert len(data['detail']['data']) == 2 + + @patch('mes_dashboard.services.resource_dataset_cache._get_workcenter_mapping') + @patch('mes_dashboard.services.resource_dataset_cache._get_resource_lookup') + @patch('mes_dashboard.services.resource_dataset_cache.read_sql_df') + @patch('mes_dashboard.services.resource_dataset_cache._get_filtered_resources_and_lookup') + def test_detail_query_workflow(self, mock_res_lookup, mock_read_sql, + mock_get_lookup, mock_get_wc, client): + """Detail query via POST /query should return hierarchical data.""" + resources = [ { 'RESOURCEID': 'RES001', 'WORKCENTERNAME': '焊接_DB', @@ -199,41 +202,52 @@ class TestResourceHistoryAPIWorkflow: 'RESOURCENAME': 'RES002', }, ] + resource_lookup = {r['RESOURCEID']: r for r in resources} + mock_res_lookup.return_value = ( + resources, + resource_lookup, + "HISTORYID IN ('RES001', 'RES002')", + ) + mock_get_lookup.return_value = resource_lookup + mock_get_wc.return_value = { + '焊接_DB': {'group': '焊接_DB', 'sequence': 1}, + } - detail_df = pd.DataFrame([ - {'HISTORYID': 'RES001', + base_df = pd.DataFrame([ + {'HISTORYID': 'RES001', 'DATA_DATE': datetime(2024, 1, 1), 'PRD_HOURS': 80, 'SBY_HOURS': 10, 'UDT_HOURS': 5, 'SDT_HOURS': 3, 'EGT_HOURS': 2, 'NST_HOURS': 10, 'TOTAL_HOURS': 110}, - {'HISTORYID': 'RES002', + {'HISTORYID': 'RES002', 'DATA_DATE': datetime(2024, 1, 1), 'PRD_HOURS': 75, 'SBY_HOURS': 15, 'UDT_HOURS': 5, 'SDT_HOURS': 3, 'EGT_HOURS': 2, 'NST_HOURS': 10, 'TOTAL_HOURS': 110}, ]) - - mock_read_sql.return_value = detail_df - - response = client.get( - '/api/resource/history/detail' - '?start_date=2024-01-01' - '&end_date=2024-01-07' - ) - - assert response.status_code == 200 - data = json.loads(response.data) - assert data['success'] is True - assert data['total'] == 2 - assert len(data['data']) == 2 - assert data['truncated'] is False - - # Verify data structure - first_row = data['data'][0] - assert 'workcenter' in first_row - assert 'family' in first_row - assert 'resource' in first_row - assert 'ou_pct' in first_row - assert 'availability_pct' in first_row - assert 'prd_hours' in first_row - assert 'prd_pct' in first_row - + mock_read_sql.return_value = base_df + + response = client.post( + '/api/resource/history/query', + json={ + 'start_date': '2024-01-01', + 'end_date': '2024-01-07', + }, + ) + + assert response.status_code == 200 + data = json.loads(response.data) + assert data['success'] is True + assert data['detail']['total'] == 2 + assert len(data['detail']['data']) == 2 + assert data['detail']['truncated'] is False + + # Verify data structure + first_row = data['detail']['data'][0] + assert 'workcenter' in first_row + assert 'family' in first_row + assert 'resource' in first_row + assert 'ou_pct' in first_row + assert 'availability_pct' in first_row + assert 'prd_hours' in first_row + assert 'prd_pct' in first_row + @patch('mes_dashboard.services.resource_history_service._get_filtered_resources') @patch('mes_dashboard.services.resource_history_service.read_sql_df') def test_export_workflow(self, mock_read_sql, mock_resources, client): @@ -251,101 +265,103 @@ class TestResourceHistoryAPIWorkflow: 'PRD_HOURS': 80, 'SBY_HOURS': 10, 'UDT_HOURS': 5, 'SDT_HOURS': 3, 'EGT_HOURS': 2, 'NST_HOURS': 10, 'TOTAL_HOURS': 110}, ]) - - response = client.get( - '/api/resource/history/export' - '?start_date=2024-01-01' - '&end_date=2024-01-07' - ) - - assert response.status_code == 200 - assert 'text/csv' in response.content_type - - content = response.data.decode('utf-8-sig') - lines = content.strip().split('\n') - - # Should have header + data rows - assert len(lines) >= 2 - - # Verify header - header = lines[0] - assert '站點' in header - assert 'OU%' in header - assert 'Availability%' in header - - -class TestResourceHistoryValidation: - """E2E tests for input validation.""" - - def test_date_range_validation(self, client): - """Date range exceeding 730 days should be rejected.""" - response = client.get( - '/api/resource/history/summary' - '?start_date=2024-01-01' - '&end_date=2026-01-02' - ) - - assert response.status_code == 400 - data = json.loads(response.data) - assert data['success'] is False - assert '730' in data['error'] - - def test_missing_required_params(self, client): - """Missing required parameters should return error.""" - response = client.get('/api/resource/history/summary') - - assert response.status_code == 400 - data = json.loads(response.data) - assert data['success'] is False - - @patch('mes_dashboard.services.resource_history_service._get_filtered_resources') - @patch('mes_dashboard.services.resource_history_service.read_sql_df') - def test_granularity_options(self, mock_read_sql, mock_resources, client): - """Different granularity options should work.""" - mock_resources.return_value = [{ + + response = client.get( + '/api/resource/history/export' + '?start_date=2024-01-01' + '&end_date=2024-01-07' + ) + + assert response.status_code == 200 + assert 'text/csv' in response.content_type + + content = response.data.decode('utf-8-sig') + lines = content.strip().split('\n') + + # Should have header + data rows + assert len(lines) >= 2 + + # Verify header + header = lines[0] + assert '站點' in header + assert 'OU%' in header + assert 'Availability%' in header + + +class TestResourceHistoryValidation: + """E2E tests for input validation.""" + + def test_date_range_validation(self, client): + """Inverted date range (end_date < start_date) should be rejected.""" + response = client.post( + '/api/resource/history/query', + json={ + 'start_date': '2026-01-02', + 'end_date': '2024-01-01', + }, + ) + + assert response.status_code == 400 + data = json.loads(response.data) + assert data['success'] is False + + def test_missing_required_params(self, client): + """Missing required parameters should return error.""" + response = client.post( + '/api/resource/history/query', + json={}, + ) + + assert response.status_code == 400 + data = json.loads(response.data) + assert data['success'] is False + + @patch('mes_dashboard.services.resource_dataset_cache._get_workcenter_mapping') + @patch('mes_dashboard.services.resource_dataset_cache._get_resource_lookup') + @patch('mes_dashboard.services.resource_dataset_cache.read_sql_df') + @patch('mes_dashboard.services.resource_dataset_cache._get_filtered_resources_and_lookup') + def test_granularity_options(self, mock_res_lookup, mock_read_sql, + mock_get_lookup, mock_get_wc, client): + """Different granularity options should work via POST /query.""" + resources = [{ 'RESOURCEID': 'RES001', 'WORKCENTERNAME': '焊接_DB', 'RESOURCEFAMILYNAME': 'FAM001', 'RESOURCENAME': 'RES001', }] - kpi_df = pd.DataFrame([{ - 'PRD_HOURS': 100, 'SBY_HOURS': 10, 'UDT_HOURS': 5, - 'SDT_HOURS': 3, 'EGT_HOURS': 2, 'NST_HOURS': 10, 'MACHINE_COUNT': 5 - }]) - trend_df = pd.DataFrame([{ - 'DATA_DATE': datetime(2024, 1, 1), - 'PRD_HOURS': 100, 'SBY_HOURS': 10, 'UDT_HOURS': 5, - 'SDT_HOURS': 3, 'EGT_HOURS': 2, 'NST_HOURS': 10, - 'MACHINE_COUNT': 5 - }]) - heatmap_raw_df = pd.DataFrame([{ + resource_lookup = {r['RESOURCEID']: r for r in resources} + mock_res_lookup.return_value = ( + resources, + resource_lookup, + "HISTORYID IN ('RES001')", + ) + mock_get_lookup.return_value = resource_lookup + mock_get_wc.return_value = { + '焊接_DB': {'group': '焊接_DB', 'sequence': 1}, + } + + base_df = pd.DataFrame([{ 'HISTORYID': 'RES001', 'DATA_DATE': datetime(2024, 1, 1), 'PRD_HOURS': 100, 'SBY_HOURS': 10, 'UDT_HOURS': 5, - 'SDT_HOURS': 3, 'EGT_HOURS': 2, 'NST_HOURS': 10 + 'SDT_HOURS': 3, 'EGT_HOURS': 2, 'NST_HOURS': 10, + 'TOTAL_HOURS': 130, }]) + mock_read_sql.return_value = base_df for granularity in ['day', 'week', 'month', 'year']: - def mock_sql(sql, _params=None): - sql_upper = sql.upper() - if 'HISTORYID' in sql_upper and 'DATA_DATE' in sql_upper: - return heatmap_raw_df - if 'DATA_DATE' in sql_upper: - return trend_df - return kpi_df + response = client.post( + '/api/resource/history/query', + json={ + 'start_date': '2024-01-01', + 'end_date': '2024-01-31', + 'granularity': granularity, + }, + ) + + assert response.status_code == 200, f"Failed for granularity={granularity}" - mock_read_sql.side_effect = mock_sql - response = client.get( - f'/api/resource/history/summary' - f'?start_date=2024-01-01' - f'&end_date=2024-01-31' - f'&granularity={granularity}' - ) - - assert response.status_code == 200, f"Failed for granularity={granularity}" - - class TestResourceHistoryNavigation: """E2E tests for navigation integration.""" @@ -368,7 +384,7 @@ class TestResourceHistoryNavigation: content = response.data.decode('utf-8') assert '設備歷史績效' in content assert 'resourceHistoryFrame' in content - - -if __name__ == '__main__': - pytest.main([__file__, '-v']) + + +if __name__ == '__main__': + pytest.main([__file__, '-v']) diff --git a/tests/e2e/test_trace_pipeline_e2e.py b/tests/e2e/test_trace_pipeline_e2e.py new file mode 100644 index 0000000..3634fcf --- /dev/null +++ b/tests/e2e/test_trace_pipeline_e2e.py @@ -0,0 +1,728 @@ +# -*- coding: utf-8 -*- +"""E2E tests for trace pipeline: memory triage, async job queue, NDJSON streaming. + +Tests the three core features implemented in the trace pipeline proposals: + 1. Memory triage — admission control, CID limits, MSD bypass + 2. Async job queue — RQ-based async routing, job lifecycle + 3. NDJSON streaming — chunked Redis storage, streaming protocol + +Run with: pytest tests/e2e/test_trace_pipeline_e2e.py -v --run-e2e +""" + +import json +import os +import time +import uuid + +import pytest +import redis +import requests + +pytestmark = [pytest.mark.e2e] + +REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0") +REDIS_KEY_PREFIX = os.getenv("REDIS_KEY_PREFIX", "mes_wip") + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- +def _post_events(base_url, profile, container_ids, domains=None, timeout=60): + payload = {"profile": profile, "container_ids": container_ids} + if domains: + payload["domains"] = domains + return requests.post( + f"{base_url}/api/trace/events", json=payload, timeout=timeout, + ) + + +def _resolve_cids(base_url, work_order): + """Resolve real container IDs from a work order via live API.""" + resp = requests.post( + f"{base_url}/api/query-tool/resolve", + json={"input_type": "work_order", "values": [work_order]}, + timeout=30, + ) + if resp.status_code != 200: + return [] + data = resp.json() + lots = data.get("data", []) + return [ + str(lot.get("container_id") or lot.get("CONTAINERID") or "") + for lot in lots + if lot.get("container_id") or lot.get("CONTAINERID") + ] + + +def _get_redis(): + """Get a direct Redis client for seeding test data.""" + return redis.from_url(REDIS_URL, decode_responses=True) + + +def _key(suffix): + return f"{REDIS_KEY_PREFIX}:{suffix}" + + +def _seed_completed_job(r, job_id, profile, domain_data, aggregation=None, + failed_domains=None, batch_size=3): + """Seed Redis with a completed job's chunked result for streaming tests. + + Args: + r: Redis client + job_id: Job identifier + profile: Profile name + domain_data: dict of {domain_name: [list of record dicts]} + aggregation: optional aggregation dict + failed_domains: list of failed domain names + batch_size: records per chunk (small for testing) + """ + ttl = 300 # 5 min TTL for test data + + # Job meta (hash) + meta_key = _key(f"trace:job:{job_id}:meta") + r.hset(meta_key, mapping={ + "profile": profile, + "cid_count": "100", + "domains": ",".join(domain_data.keys()), + "status": "finished", + "progress": "done", + "created_at": str(time.time() - 10), + "completed_at": str(time.time()), + "error": "", + }) + r.expire(meta_key, ttl) + + # Chunked result storage + domain_info = {} + for domain_name, rows in domain_data.items(): + chunks = [ + rows[i:i + batch_size] + for i in range(0, max(len(rows), 1), batch_size) + ] if rows else [] + + for idx, chunk in enumerate(chunks): + chunk_key = _key(f"trace:job:{job_id}:result:{domain_name}:{idx}") + r.setex(chunk_key, ttl, json.dumps(chunk)) + + domain_info[domain_name] = {"chunks": len(chunks), "total": len(rows)} + + # Aggregation + if aggregation is not None: + agg_key = _key(f"trace:job:{job_id}:result:aggregation") + r.setex(agg_key, ttl, json.dumps(aggregation)) + + # Result meta + result_meta = { + "profile": profile, + "domains": domain_info, + "failed_domains": sorted(failed_domains) if failed_domains else [], + } + result_meta_key = _key(f"trace:job:{job_id}:result:meta") + r.setex(result_meta_key, ttl, json.dumps(result_meta)) + + +def _cleanup_job(r, job_id): + """Remove all Redis keys for a test job.""" + pattern = _key(f"trace:job:{job_id}:*") + keys = list(r.scan_iter(pattern)) + if keys: + r.delete(*keys) + + +def _parse_ndjson(response_text): + """Parse NDJSON response text into list of dicts.""" + lines = [] + for line in response_text.strip().split("\n"): + line = line.strip() + if line: + lines.append(json.loads(line)) + return lines + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- +@pytest.fixture(scope="module") +def base(app_server): + return app_server + + +@pytest.fixture(scope="module") +def real_cids(base): + """Resolve real CIDs from a known work order.""" + cids = _resolve_cids(base, "GA26010001") + if not cids: + pytest.skip("No container IDs resolved — cannot test trace pipeline") + return cids + + +@pytest.fixture(scope="module") +def rclient(): + """Direct Redis client for seeding/cleanup.""" + r = _get_redis() + try: + r.ping() + except redis.ConnectionError: + pytest.skip("Redis not available") + return r + + +# =========================================================================== +# 1. Memory Triage — Admission Control +# =========================================================================== +class TestTraceAdmissionControl: + """Verify admission control: CID limits, profile bypass, validation.""" + + def test_sync_response_with_small_cid_set(self, base, real_cids): + """Small CID count → sync 200 response with actual trace data.""" + small_cids = real_cids[:3] + resp = _post_events(base, "query_tool", small_cids, domains=["history"]) + + assert resp.status_code == 200, f"Expected 200, got {resp.status_code}: {resp.text[:200]}" + data = resp.json() + assert data["stage"] == "events" + assert "results" in data + assert "history" in data["results"] + history = data["results"]["history"] + assert "data" in history + assert "count" in history + assert isinstance(history["data"], list) + # With real CIDs we should get actual history records + assert history["count"] >= 0 + + def test_sync_response_data_structure_complete(self, base, real_cids): + """Sync response has proper domain data structure with count/data keys.""" + resp = _post_events(base, "query_tool", real_cids[:5], + domains=["history", "materials"]) + assert resp.status_code == 200 + data = resp.json() + for domain in ["history", "materials"]: + assert domain in data["results"], f"Missing domain '{domain}'" + d = data["results"][domain] + assert "data" in d, f"Domain '{domain}' missing 'data'" + assert "count" in d, f"Domain '{domain}' missing 'count'" + assert d["count"] == len(d["data"]) + + def test_cid_limit_exceeded_non_msd_returns_413_or_202(self, base): + """Non-MSD profile with > CID_LIMIT → 413 (no async) or 202 (async).""" + cid_limit = int(os.getenv("TRACE_EVENTS_CID_LIMIT", "50000")) + # Generate fake CIDs that exceed the limit + fake_cids = [f"FAKE-{i:06x}" for i in range(cid_limit + 1)] + + resp = _post_events(base, "query_tool", fake_cids, domains=["history"]) + + assert resp.status_code in (413, 202), ( + f"Expected 413 or 202 for {cid_limit + 1} CIDs, got {resp.status_code}" + ) + if resp.status_code == 413: + err = resp.json()["error"] + assert err["code"] == "CID_LIMIT_EXCEEDED" + assert str(cid_limit) in err["message"] + + def test_msd_profile_bypasses_cid_limit(self, base): + """MSD profile must NOT return 413 even with > CID_LIMIT CIDs. + + MSD requires all CIDs for accurate aggregation — no hard cutoff. + With async available, large MSD queries should route to 202. + Without async, they proceed to sync (may be slow but not rejected). + """ + async_threshold = int(os.getenv("TRACE_ASYNC_CID_THRESHOLD", "20000")) + # Use just above async threshold — enough to trigger async routing + # but below CID_LIMIT to keep test fast + fake_cids = [f"MSD-{i:06x}" for i in range(async_threshold + 1)] + + resp = _post_events(base, "mid_section_defect", fake_cids, + domains=["rejects"]) + + # Must NOT be 413 — MSD bypasses CID limit + assert resp.status_code != 413, ( + "MSD profile should NEVER receive 413 CID_LIMIT_EXCEEDED" + ) + # Should be 202 (async) or 200 (sync fallback) + assert resp.status_code in (200, 202) + + def test_empty_container_ids_rejected(self, base): + """Empty container_ids list → 400 INVALID_PARAMS.""" + resp = _post_events(base, "query_tool", []) + assert resp.status_code == 400 + assert resp.json()["error"]["code"] == "INVALID_PARAMS" + + def test_missing_profile_rejected(self, base): + """Missing profile field → 400.""" + resp = requests.post( + f"{base}/api/trace/events", + json={"container_ids": ["CID-001"]}, + timeout=10, + ) + assert resp.status_code == 400 + + def test_invalid_domain_rejected(self, base): + """Invalid domain name → 400 INVALID_PARAMS.""" + resp = _post_events( + base, "query_tool", ["CID-001"], domains=["nonexistent_domain"], + ) + assert resp.status_code == 400 + assert "INVALID_PARAMS" in resp.json()["error"]["code"] + + +# =========================================================================== +# 2. Async Job Queue +# =========================================================================== +class TestTraceAsyncJobQueue: + """Verify async job routing, lifecycle, and result retrieval.""" + + def test_async_routing_returns_202_with_correct_format(self, base): + """Large CID count + async available → 202 with job metadata.""" + threshold = int(os.getenv("TRACE_ASYNC_CID_THRESHOLD", "20000")) + fake_cids = [f"ASYNC-{i:06x}" for i in range(threshold + 1)] + + resp = _post_events(base, "query_tool", fake_cids, domains=["history"]) + + if resp.status_code != 202: + pytest.skip("Async not available (RQ worker not running)") + + data = resp.json() + assert data["async"] is True + assert data["stage"] == "events" + assert "job_id" in data + assert data["job_id"].startswith("trace-evt-") + assert "status_url" in data + assert "stream_url" in data + assert data["status_url"] == f"/api/trace/job/{data['job_id']}" + assert data["stream_url"] == f"/api/trace/job/{data['job_id']}/stream" + + def test_job_status_after_enqueue(self, base): + """After async enqueue, job status should be queryable.""" + threshold = int(os.getenv("TRACE_ASYNC_CID_THRESHOLD", "20000")) + fake_cids = [f"STATUS-{i:06x}" for i in range(threshold + 1)] + + enqueue_resp = _post_events(base, "query_tool", fake_cids, + domains=["history"]) + if enqueue_resp.status_code != 202: + pytest.skip("Async not available") + + job_id = enqueue_resp.json()["job_id"] + + status_resp = requests.get( + f"{base}/api/trace/job/{job_id}", timeout=10, + ) + assert status_resp.status_code == 200 + status = status_resp.json() + assert status["job_id"] == job_id + assert status["status"] in ("queued", "started", "finished", "failed") + assert status["profile"] == "query_tool" + assert status["cid_count"] == threshold + 1 + assert "history" in status["domains"] + assert "elapsed_seconds" in status + + def test_job_lifecycle_poll_until_terminal(self, base): + """Full lifecycle: enqueue → poll until finished/failed → verify result.""" + threshold = int(os.getenv("TRACE_ASYNC_CID_THRESHOLD", "20000")) + fake_cids = [f"LIFE-{i:06x}" for i in range(threshold + 1)] + + enqueue_resp = _post_events(base, "query_tool", fake_cids, + domains=["history"]) + if enqueue_resp.status_code != 202: + pytest.skip("Async not available") + + job_id = enqueue_resp.json()["job_id"] + status_url = f"{base}/api/trace/job/{job_id}" + + # Poll until terminal state (max 120s — fake CIDs will fail fast) + terminal = False + final_status = None + deadline = time.time() + 120 + while time.time() < deadline: + resp = requests.get(status_url, timeout=10) + assert resp.status_code == 200 + final_status = resp.json() + if final_status["status"] in ("finished", "failed"): + terminal = True + break + time.sleep(2) + + assert terminal, f"Job did not reach terminal state within 120s, last: {final_status}" + + # Job with fake CIDs may finish with empty results or fail — + # either is acceptable. Key is that the lifecycle completed. + if final_status["status"] == "finished": + # Result should be retrievable + result_resp = requests.get( + f"{base}/api/trace/job/{job_id}/result", timeout=10, + ) + assert result_resp.status_code == 200 + result = result_resp.json() + assert result["stage"] == "events" + assert "results" in result + + def test_job_not_found_returns_404(self, base): + """Non-existent job → 404.""" + resp = requests.get( + f"{base}/api/trace/job/trace-evt-nonexistent99", timeout=10, + ) + assert resp.status_code == 404 + assert resp.json()["error"]["code"] == "JOB_NOT_FOUND" + + def test_job_result_not_found_returns_404(self, base): + """Non-existent job result → 404.""" + resp = requests.get( + f"{base}/api/trace/job/trace-evt-nonexistent99/result", timeout=10, + ) + assert resp.status_code == 404 + + def test_job_result_before_completion_returns_409(self, base, rclient): + """Result request for an in-progress job → 409.""" + job_id = f"trace-evt-inprogress{uuid.uuid4().hex[:6]}" + meta_key = _key(f"trace:job:{job_id}:meta") + rclient.hset(meta_key, mapping={ + "profile": "query_tool", + "cid_count": "100", + "domains": "history", + "status": "started", + "progress": "fetching", + "created_at": str(time.time()), + "completed_at": "", + "error": "", + }) + rclient.expire(meta_key, 60) + + try: + resp = requests.get( + f"{base}/api/trace/job/{job_id}/result", timeout=10, + ) + assert resp.status_code == 409 + assert resp.json()["error"]["code"] == "JOB_NOT_COMPLETE" + finally: + rclient.delete(meta_key) + + +# =========================================================================== +# 3. NDJSON Streaming +# =========================================================================== +class TestTraceNDJSONStream: + """Verify NDJSON streaming endpoint and protocol.""" + + def test_stream_not_found_returns_404(self, base): + """Stream for non-existent job → 404.""" + resp = requests.get( + f"{base}/api/trace/job/trace-evt-nonexistent99/stream", timeout=10, + ) + assert resp.status_code == 404 + + def test_stream_before_completion_returns_409(self, base, rclient): + """Stream request for an in-progress job → 409.""" + job_id = f"trace-evt-stream409{uuid.uuid4().hex[:6]}" + meta_key = _key(f"trace:job:{job_id}:meta") + rclient.hset(meta_key, mapping={ + "profile": "query_tool", + "cid_count": "50", + "domains": "history", + "status": "started", + "progress": "fetching", + "created_at": str(time.time()), + "completed_at": "", + "error": "", + }) + rclient.expire(meta_key, 60) + + try: + resp = requests.get( + f"{base}/api/trace/job/{job_id}/stream", timeout=10, + ) + assert resp.status_code == 409 + data = resp.json() + assert data["error"]["code"] == "JOB_NOT_COMPLETE" + assert data["status"] == "started" + finally: + rclient.delete(meta_key) + + def test_stream_protocol_single_domain(self, base, rclient): + """Stream a completed job with one domain — verify full NDJSON protocol.""" + job_id = f"trace-evt-stream1d{uuid.uuid4().hex[:6]}" + records = [{"CID": f"C{i}", "EVENT": f"ev{i}", "TS": "2026-01-01"} + for i in range(7)] + + _seed_completed_job(rclient, job_id, "query_tool", + {"history": records}, batch_size=3) + try: + resp = requests.get( + f"{base}/api/trace/job/{job_id}/stream", timeout=10, + ) + assert resp.status_code == 200 + assert "application/x-ndjson" in resp.headers["Content-Type"] + assert resp.headers.get("Cache-Control") == "no-cache" + + lines = _parse_ndjson(resp.text) + types = [ln["type"] for ln in lines] + + # Protocol: meta → domain_start → records(×3) → domain_end → complete + assert types[0] == "meta" + assert types[1] == "domain_start" + assert types[-2] == "domain_end" + assert types[-1] == "complete" + + # Verify meta line + meta = lines[0] + assert meta["job_id"] == job_id + assert meta["profile"] == "query_tool" + assert "history" in meta["domains"] + + # Verify domain_start + ds = lines[1] + assert ds["domain"] == "history" + assert ds["total"] == 7 + + # Verify records batches (7 records / batch_size=3 → 3 chunks) + record_lines = [ln for ln in lines if ln["type"] == "records"] + assert len(record_lines) == 3 # ceil(7/3) = 3 chunks + total_streamed = sum(ln["count"] for ln in record_lines) + assert total_streamed == 7 + + # Verify batch indices + batches = [ln["batch"] for ln in record_lines] + assert batches == [0, 1, 2] + + # Verify actual data content + all_records = [] + for ln in record_lines: + assert ln["domain"] == "history" + all_records.extend(ln["data"]) + assert len(all_records) == 7 + assert all_records[0]["CID"] == "C0" + assert all_records[6]["CID"] == "C6" + + # Verify domain_end count matches + de = [ln for ln in lines if ln["type"] == "domain_end"][0] + assert de["count"] == 7 + + # Verify complete + complete = lines[-1] + assert complete["total_records"] == 7 + finally: + _cleanup_job(rclient, job_id) + + def test_stream_protocol_multi_domain(self, base, rclient): + """Stream a completed job with multiple domains.""" + job_id = f"trace-evt-streammd{uuid.uuid4().hex[:6]}" + history = [{"CID": f"H{i}", "EVENT": "hist"} for i in range(5)] + materials = [{"CID": f"M{i}", "MAT": "mat"} for i in range(4)] + rejects = [{"CID": f"R{i}", "REJ": "rej"} for i in range(2)] + + _seed_completed_job(rclient, job_id, "query_tool", { + "history": history, + "materials": materials, + "rejects": rejects, + }, batch_size=3) + + try: + resp = requests.get( + f"{base}/api/trace/job/{job_id}/stream", timeout=10, + ) + assert resp.status_code == 200 + lines = _parse_ndjson(resp.text) + types = [ln["type"] for ln in lines] + + # Must start with meta and end with complete + assert types[0] == "meta" + assert types[-1] == "complete" + assert set(lines[0]["domains"]) == {"history", "materials", "rejects"} + + # Each domain must have domain_start → records → domain_end sequence + for domain_name, expected_total in [("history", 5), ("materials", 4), ("rejects", 2)]: + starts = [ln for ln in lines if ln["type"] == "domain_start" and ln["domain"] == domain_name] + ends = [ln for ln in lines if ln["type"] == "domain_end" and ln["domain"] == domain_name] + recs = [ln for ln in lines if ln["type"] == "records" and ln["domain"] == domain_name] + + assert len(starts) == 1, f"Expected 1 domain_start for {domain_name}" + assert len(ends) == 1, f"Expected 1 domain_end for {domain_name}" + assert starts[0]["total"] == expected_total + assert ends[0]["count"] == expected_total + assert sum(ln["count"] for ln in recs) == expected_total + + # Total records across all domains + complete = lines[-1] + assert complete["total_records"] == 11 # 5 + 4 + 2 + finally: + _cleanup_job(rclient, job_id) + + def test_stream_with_aggregation(self, base, rclient): + """Stream includes aggregation line for MSD profile.""" + job_id = f"trace-evt-streamagg{uuid.uuid4().hex[:6]}" + rejects = [{"CID": f"R{i}", "DEFECT": "scratch"} for i in range(4)] + aggregation = { + "total_defects": 42, + "by_category": {"scratch": 30, "crack": 12}, + } + + _seed_completed_job(rclient, job_id, "mid_section_defect", + {"rejects": rejects}, aggregation=aggregation, + batch_size=5) + try: + resp = requests.get( + f"{base}/api/trace/job/{job_id}/stream", timeout=10, + ) + assert resp.status_code == 200 + lines = _parse_ndjson(resp.text) + types = [ln["type"] for ln in lines] + + assert "aggregation" in types + agg_line = [ln for ln in lines if ln["type"] == "aggregation"][0] + assert agg_line["data"]["total_defects"] == 42 + assert agg_line["data"]["by_category"]["scratch"] == 30 + + # aggregation must come after domain_end and before complete + agg_idx = types.index("aggregation") + complete_idx = types.index("complete") + last_domain_end_idx = max( + i for i, t in enumerate(types) if t == "domain_end" + ) + assert last_domain_end_idx < agg_idx < complete_idx + finally: + _cleanup_job(rclient, job_id) + + def test_stream_with_failed_domains(self, base, rclient): + """Stream includes warning line when some domains failed.""" + job_id = f"trace-evt-streamfail{uuid.uuid4().hex[:6]}" + history = [{"CID": "C1", "EVENT": "ev1"}] + + _seed_completed_job(rclient, job_id, "query_tool", + {"history": history}, + failed_domains=["materials", "rejects"], + batch_size=5) + try: + resp = requests.get( + f"{base}/api/trace/job/{job_id}/stream", timeout=10, + ) + assert resp.status_code == 200 + lines = _parse_ndjson(resp.text) + types = [ln["type"] for ln in lines] + + assert "warning" in types + warning = [ln for ln in lines if ln["type"] == "warning"][0] + assert warning["code"] == "EVENTS_PARTIAL_FAILURE" + assert set(warning["failed_domains"]) == {"materials", "rejects"} + finally: + _cleanup_job(rclient, job_id) + + def test_stream_empty_domain(self, base, rclient): + """Stream handles domain with zero records gracefully.""" + job_id = f"trace-evt-streamempty{uuid.uuid4().hex[:6]}" + + _seed_completed_job(rclient, job_id, "query_tool", + {"history": []}, batch_size=5) + try: + resp = requests.get( + f"{base}/api/trace/job/{job_id}/stream", timeout=10, + ) + assert resp.status_code == 200 + lines = _parse_ndjson(resp.text) + types = [ln["type"] for ln in lines] + + assert "domain_start" in types + assert "domain_end" in types + ds = [ln for ln in lines if ln["type"] == "domain_start"][0] + de = [ln for ln in lines if ln["type"] == "domain_end"][0] + assert ds["total"] == 0 + assert de["count"] == 0 + + complete = lines[-1] + assert complete["total_records"] == 0 + finally: + _cleanup_job(rclient, job_id) + + def test_stream_content_matches_result_endpoint(self, base, rclient): + """Stream data must match what GET /result returns.""" + job_id = f"trace-evt-streammatch{uuid.uuid4().hex[:6]}" + records = [{"CID": f"C{i}", "VAL": i * 10} for i in range(8)] + + _seed_completed_job(rclient, job_id, "query_tool", + {"history": records}, batch_size=3) + try: + # Get via result endpoint + result_resp = requests.get( + f"{base}/api/trace/job/{job_id}/result", timeout=10, + ) + assert result_resp.status_code == 200 + result_data = result_resp.json() + + # Get via stream endpoint + stream_resp = requests.get( + f"{base}/api/trace/job/{job_id}/stream", timeout=10, + ) + assert stream_resp.status_code == 200 + lines = _parse_ndjson(stream_resp.text) + + # Collect all streamed records + streamed_records = [] + for ln in lines: + if ln["type"] == "records" and ln["domain"] == "history": + streamed_records.extend(ln["data"]) + + # Compare counts + result_history = result_data["results"]["history"] + assert len(streamed_records) == result_history["count"] + + # Compare actual data content + assert streamed_records == result_history["data"] + finally: + _cleanup_job(rclient, job_id) + + +# =========================================================================== +# 4. Full Async → Stream End-to-End +# =========================================================================== +class TestTraceAsyncToStream: + """Full end-to-end: POST events → async 202 → poll → stream NDJSON.""" + + def test_full_async_lifecycle_with_stream(self, base, real_cids): + """Complete flow: real CIDs → async → poll → stream → verify data. + + Uses real CIDs but requires TRACE_ASYNC_CID_THRESHOLD to be low enough + or enough CIDs. If async not triggered, test sync+seed stream instead. + """ + threshold = int(os.getenv("TRACE_ASYNC_CID_THRESHOLD", "20000")) + + if len(real_cids) <= threshold: + # Not enough CIDs to trigger async — test sync path instead + # and verify stream works for the seeded result + resp = _post_events(base, "query_tool", real_cids[:10], + domains=["history"]) + assert resp.status_code == 200 + data = resp.json() + assert data["stage"] == "events" + assert "history" in data["results"] + # Sync path proven — stream is tested in TestTraceNDJSONStream + return + + # If we have enough CIDs, test full async lifecycle + resp = _post_events(base, "query_tool", real_cids, domains=["history"]) + assert resp.status_code == 202 + job_id = resp.json()["job_id"] + + # Poll until finished + deadline = time.time() + 180 + final_status = None + while time.time() < deadline: + status_resp = requests.get( + f"{base}/api/trace/job/{job_id}", timeout=10, + ) + final_status = status_resp.json() + if final_status["status"] in ("finished", "failed"): + break + time.sleep(2) + + assert final_status["status"] == "finished", ( + f"Job did not finish: {final_status}" + ) + + # Stream the result + stream_resp = requests.get( + f"{base}/api/trace/job/{job_id}/stream", timeout=30, + ) + assert stream_resp.status_code == 200 + lines = _parse_ndjson(stream_resp.text) + + # Verify protocol integrity + assert lines[0]["type"] == "meta" + assert lines[-1]["type"] == "complete" + assert lines[-1]["total_records"] > 0 diff --git a/tests/test_cache_integration.py b/tests/test_cache_integration.py index c67fe32..98cc0da 100644 --- a/tests/test_cache_integration.py +++ b/tests/test_cache_integration.py @@ -28,7 +28,9 @@ class TestHealthEndpoint: @patch('mes_dashboard.routes.health_routes.check_database') @patch('mes_dashboard.routes.health_routes.check_redis') @patch('mes_dashboard.routes.health_routes.get_cache_status') - def test_health_all_ok(self, mock_cache_status, mock_check_redis, mock_check_db, app_with_mock_cache): + @patch('mes_dashboard.routes.health_routes.get_route_cache_status', return_value={'mode': 'none', 'degraded': False, 'available': False}) + @patch('mes_dashboard.core.circuit_breaker.get_circuit_breaker_status', return_value={'state': 'CLOSED', 'enabled': True, 'failure_count': 0, 'success_count': 0, 'total_count': 0, 'failure_rate': 0.0}) + def test_health_all_ok(self, mock_cb, mock_route_cache, mock_cache_status, mock_check_redis, mock_check_db, app_with_mock_cache): """Test health endpoint returns 200 when all services are healthy.""" mock_check_db.return_value = ('ok', None) mock_check_redis.return_value = ('ok', None) @@ -84,7 +86,9 @@ class TestHealthEndpoint: @patch('mes_dashboard.routes.health_routes.check_database') @patch('mes_dashboard.routes.health_routes.check_redis') @patch('mes_dashboard.routes.health_routes.get_cache_status') - def test_health_redis_disabled(self, mock_cache_status, mock_check_redis, mock_check_db, app_with_mock_cache): + @patch('mes_dashboard.routes.health_routes.get_route_cache_status', return_value={'mode': 'none', 'degraded': False, 'available': False}) + @patch('mes_dashboard.core.circuit_breaker.get_circuit_breaker_status', return_value={'state': 'CLOSED', 'enabled': True, 'failure_count': 0, 'success_count': 0, 'total_count': 0, 'failure_rate': 0.0}) + def test_health_redis_disabled(self, mock_cb, mock_route_cache, mock_cache_status, mock_check_redis, mock_check_db, app_with_mock_cache): """Test health endpoint shows Redis disabled status.""" mock_check_db.return_value = ('ok', None) mock_check_redis.return_value = ('disabled', None) @@ -105,15 +109,15 @@ class TestWipApiWithCache: @pytest.fixture def mock_wip_cache_data(self): """Create mock WIP data for cache.""" - return pd.DataFrame({ - 'LOTID': ['LOT001', 'LOT002', 'LOT003'], - 'QTY': [100, 200, 150], - 'WORKORDER': ['WO001', 'WO002', 'WO003'], - 'WORKCENTER_GROUP': ['WC1', 'WC1', 'WC2'], - 'WORKCENTERSEQUENCE_GROUP': [1, 1, 2], - 'PACKAGE_LEF': ['PKG1', 'PKG2', 'PKG1'], - 'PRODUCTLINENAME': ['PKG1', 'PKG2', 'PKG1'], - 'EQUIPMENTCOUNT': [1, 0, 0], + return pd.DataFrame({ + 'LOTID': ['LOT001', 'LOT002', 'LOT003'], + 'QTY': [100, 200, 150], + 'WORKORDER': ['WO001', 'WO002', 'WO003'], + 'WORKCENTER_GROUP': ['WC1', 'WC1', 'WC2'], + 'WORKCENTERSEQUENCE_GROUP': [1, 1, 2], + 'PACKAGE_LEF': ['PKG1', 'PKG2', 'PKG1'], + 'PRODUCTLINENAME': ['PKG1', 'PKG2', 'PKG1'], + 'EQUIPMENTCOUNT': [1, 0, 0], 'CURRENTHOLDCOUNT': [0, 1, 0], 'HOLDREASONNAME': [None, 'Quality Issue', None], 'STATUS': ['ACTIVE', 'HOLD', 'ACTIVE'], diff --git a/tests/test_cutover_gates.py b/tests/test_cutover_gates.py deleted file mode 100644 index 9507249..0000000 --- a/tests/test_cutover_gates.py +++ /dev/null @@ -1,184 +0,0 @@ -# -*- coding: utf-8 -*- -"""Cutover gate enforcement tests for portal shell route-view migration.""" - -from __future__ import annotations - -import json -from pathlib import Path - -import pytest - -from mes_dashboard.app import create_app - -ROOT = Path(__file__).resolve().parents[1] -BASELINE_DIR = ROOT / "docs" / "migration" / "portal-shell-route-view-integration" - -pytestmark = pytest.mark.skipif( - not BASELINE_DIR.exists(), - reason=f"Migration baseline directory missing: {BASELINE_DIR}", -) -BASELINE_VISIBILITY_FILE = BASELINE_DIR / "baseline_drawer_visibility.json" -BASELINE_API_FILE = BASELINE_DIR / "baseline_api_payload_contracts.json" -GATE_REPORT_FILE = BASELINE_DIR / "cutover-gates-report.json" -WAVE_A_EVIDENCE_FILE = BASELINE_DIR / "wave-a-smoke-evidence.json" -WAVE_B_EVIDENCE_FILE = BASELINE_DIR / "wave-b-native-smoke-evidence.json" -WAVE_B_PARITY_FILE = BASELINE_DIR / "wave-b-parity-evidence.json" -VISUAL_SNAPSHOT_FILE = BASELINE_DIR / "visual-regression-snapshots.json" -ROLLBACK_RUNBOOK = BASELINE_DIR / "rollback-rehearsal-shell-route-view.md" -KILL_SWITCH_DOC = BASELINE_DIR / "kill-switch-operations.md" -OBSERVABILITY_REPORT = BASELINE_DIR / "migration-observability-report.md" -STRESS_SUITE = ROOT / "tests" / "stress" / "test_frontend_stress.py" - - -def _read_json(path: Path) -> dict: - return json.loads(path.read_text(encoding="utf-8")) - - -def _login_as_admin(client) -> None: - with client.session_transaction() as sess: - sess["admin"] = {"displayName": "Admin", "employeeNo": "A001"} - - -def _route_set(drawers: list[dict]) -> set[str]: - return { - str(page.get("route")) - for drawer in drawers - for page in drawer.get("pages", []) - if page.get("route") - } - - -def test_g1_route_availability_gate_p0_routes_are_2xx_or_3xx(): - app = create_app("testing") - app.config["TESTING"] = True - client = app.test_client() - _login_as_admin(client) - - p0_routes = [ - "/", - "/portal-shell", - "/api/portal/navigation", - "/wip-overview", - "/resource", - "/qc-gate", - "/job-query", - "/excel-query", - "/query-tool", - ] - - statuses = [client.get(route).status_code for route in p0_routes] - assert all(200 <= status < 400 for status in statuses), statuses - - -def test_g2_drawer_parity_gate_matches_baseline_for_admin_and_non_admin(): - baseline = _read_json(BASELINE_VISIBILITY_FILE) - app = create_app("testing") - app.config["TESTING"] = True - - non_admin_client = app.test_client() - non_admin_payload = _read_json_response(non_admin_client.get("/api/portal/navigation")) - - admin_client = app.test_client() - _login_as_admin(admin_client) - admin_payload = _read_json_response(admin_client.get("/api/portal/navigation")) - - assert _route_set(non_admin_payload["drawers"]) == _route_set(baseline["non_admin"]) - assert _route_set(admin_payload["drawers"]) == _route_set(baseline["admin"]) - - -def test_g3_smoke_evidence_gate_requires_wave_a_and_wave_b_pass(): - wave_a = _read_json(WAVE_A_EVIDENCE_FILE) - wave_b = _read_json(WAVE_B_EVIDENCE_FILE) - - for payload in (wave_a, wave_b): - assert payload["execution"]["automated_runs"] - for run in payload["execution"]["automated_runs"]: - assert run["status"] == "pass" - for route, result in payload["pages"].items(): - assert result["status"] == "pass", f"smoke evidence failed: {route}" - assert result["critical_failures"] == [] - - -def test_g4_no_iframe_gate_blocks_if_shell_uses_iframe(): - stress_source = STRESS_SUITE.read_text(encoding="utf-8") - assert "Portal should not render iframe after migration" in stress_source - assert "iframe_count = page.locator('iframe').count()" in stress_source - - report = _read_json(GATE_REPORT_FILE) - g4 = next(g for g in report["gates"] if g["id"] == "G4") - assert g4["status"] == "pass" - assert g4["block_on_fail"] is True - - -def test_g5_route_query_compatibility_gate_checks_contracts(): - baseline = _read_json(BASELINE_API_FILE) - app = create_app("testing") - app.config["TESTING"] = True - registered_routes = {rule.rule for rule in app.url_map.iter_rules()} - - for api_route, contract in baseline.get("apis", {}).items(): - assert api_route in registered_routes, f"Missing API route in app map: {api_route}" - required_keys = contract.get("required_keys", []) - assert required_keys, f"No required_keys defined for {api_route}" - assert all(isinstance(key, str) and key for key in required_keys) - - report = _read_json(GATE_REPORT_FILE) - g5 = next(g for g in report["gates"] if g["id"] == "G5") - assert g5["status"] == "pass" - assert g5["block_on_fail"] is True - - -def test_g6_parity_gate_requires_table_chart_filter_interaction_matrix_pass(): - parity = _read_json(WAVE_B_PARITY_FILE) - for route, checks in parity["pages"].items(): - for dimension in ("table", "chart", "filter", "interaction", "matrix"): - status = checks[dimension]["status"] - assert status in {"pass", "n/a"}, f"{route} parity failed on {dimension}: {status}" - - snapshots = _read_json(VISUAL_SNAPSHOT_FILE) - assert snapshots["critical_diff_policy"]["block_release"] is True - assert len(snapshots["snapshots"]) >= 4 - - report = _read_json(GATE_REPORT_FILE) - g6 = next(g for g in report["gates"] if g["id"] == "G6") - assert g6["status"] == "pass" - assert g6["block_on_fail"] is True - - -def test_g7_rollback_gate_has_recovery_slo_and_kill_switch_steps(): - rehearsal = ROLLBACK_RUNBOOK.read_text(encoding="utf-8") - kill_switch = KILL_SWITCH_DOC.read_text(encoding="utf-8") - - assert "15 minutes" in rehearsal - assert "PORTAL_SPA_ENABLED=false" in rehearsal - assert "PORTAL_SPA_ENABLED=false" in kill_switch - assert "/api/portal/navigation" in kill_switch - assert "/health" in kill_switch - - report = _read_json(GATE_REPORT_FILE) - g7 = next(g for g in report["gates"] if g["id"] == "G7") - assert g7["status"] == "pass" - assert g7["block_on_fail"] is True - - -def test_release_block_semantics_enforced_by_gate_report(): - report = _read_json(GATE_REPORT_FILE) - assert report["policy"]["block_on_any_failed_gate"] is True - assert report["policy"]["block_on_incomplete_smoke_evidence"] is True - assert report["policy"]["block_on_critical_parity_failure"] is True - - for gate in report["gates"]: - assert gate["status"] == "pass" - assert gate["block_on_fail"] is True - assert report["release_blocked"] is False - - -def test_observability_report_covers_route_errors_health_and_fallback_usage(): - content = OBSERVABILITY_REPORT.read_text(encoding="utf-8") - assert "route errors" in content.lower() - assert "health regressions" in content.lower() - assert "wrapper fallback usage" in content.lower() - - -def _read_json_response(response) -> dict: - return json.loads(response.data.decode("utf-8")) diff --git a/tests/test_health_routes.py b/tests/test_health_routes.py index 0c64aa6..e437aab 100644 --- a/tests/test_health_routes.py +++ b/tests/test_health_routes.py @@ -21,7 +21,9 @@ def _client(): @patch('mes_dashboard.routes.health_routes.check_database', return_value=('ok', None)) @patch('mes_dashboard.routes.health_routes.check_redis', return_value=('error', 'redis-down')) @patch('mes_dashboard.routes.health_routes.get_route_cache_status') +@patch('mes_dashboard.core.circuit_breaker.get_circuit_breaker_status', return_value={'state': 'CLOSED', 'enabled': True, 'failure_count': 0, 'success_count': 0, 'total_count': 0, 'failure_rate': 0.0}) def test_health_includes_route_cache_and_degraded_warning( + _mock_cb, mock_route_cache, _mock_redis, _mock_db, diff --git a/tests/test_route_query_compatibility.py b/tests/test_route_query_compatibility.py deleted file mode 100644 index 277d83d..0000000 --- a/tests/test_route_query_compatibility.py +++ /dev/null @@ -1,54 +0,0 @@ -# -*- coding: utf-8 -*- -"""Route/query compatibility tests for shell list-detail workflows.""" - -from __future__ import annotations - -import json -from pathlib import Path - -import pytest - -ROOT = Path(__file__).resolve().parents[1] -BASELINE_DIR = ROOT / "docs" / "migration" / "portal-shell-route-view-integration" -BASELINE_ROUTE_QUERY_FILE = BASELINE_DIR / "baseline_route_query_contracts.json" - -pytestmark = pytest.mark.skipif( - not BASELINE_DIR.exists(), - reason=f"Migration baseline directory missing: {BASELINE_DIR}", -) - - -def _read_json(path: Path) -> dict: - return json.loads(path.read_text(encoding="utf-8")) - - -def test_wip_list_detail_query_contract_compatibility(): - routes = _read_json(BASELINE_ROUTE_QUERY_FILE)["routes"] - - overview_keys = set(routes["/wip-overview"]["query_keys"]) - detail_keys = set(routes["/wip-detail"]["query_keys"]) - - assert {"workorder", "lotid", "package", "type", "status"}.issubset(overview_keys) - assert overview_keys.issubset(detail_keys) - assert "workcenter" in detail_keys - - -def test_hold_list_detail_query_contract_compatibility(): - routes = _read_json(BASELINE_ROUTE_QUERY_FILE)["routes"] - - detail_keys = set(routes["/hold-detail"]["query_keys"]) - history_keys = set(routes["/hold-history"]["query_keys"]) - - assert "reason" in detail_keys - # Hold history route intentionally supports optional query keys at runtime. - assert routes["/hold-history"]["render_mode"] == "native" - assert routes["/hold-detail"]["render_mode"] == "native" - assert isinstance(history_keys, set) - - -def test_wave_b_routes_keep_native_render_mode_with_query_contract_object(): - routes = _read_json(BASELINE_ROUTE_QUERY_FILE)["routes"] - for route in ["/job-query", "/excel-query", "/query-tool", "/tmtt-defect"]: - entry = routes[route] - assert entry["render_mode"] == "native" - assert isinstance(entry["query_keys"], list) diff --git a/tests/test_route_view_migration_baseline.py b/tests/test_route_view_migration_baseline.py deleted file mode 100644 index 859f02f..0000000 --- a/tests/test_route_view_migration_baseline.py +++ /dev/null @@ -1,139 +0,0 @@ -# -*- coding: utf-8 -*- -"""Validation tests for shell route-view migration baseline artifacts.""" - -from __future__ import annotations - -import json -import copy -from pathlib import Path - -import pytest - -from mes_dashboard.app import create_app -from mes_dashboard.services.navigation_contract import ( - compute_drawer_visibility, - validate_route_migration_contract, - validate_wave_b_rewrite_entry_criteria, -) - - -ROOT = Path(__file__).resolve().parent.parent -PAGE_STATUS_FILE = ROOT / "data" / "page_status.json" -BASELINE_DIR = ROOT / "docs" / "migration" / "portal-shell-route-view-integration" - -pytestmark = pytest.mark.skipif( - not BASELINE_DIR.exists(), - reason=f"Migration baseline directory missing: {BASELINE_DIR}", -) - -BASELINE_VISIBILITY_FILE = BASELINE_DIR / "baseline_drawer_visibility.json" -BASELINE_ROUTE_QUERY_FILE = BASELINE_DIR / "baseline_route_query_contracts.json" -BASELINE_INTERACTION_FILE = BASELINE_DIR / "baseline_interaction_evidence.json" -ROUTE_CONTRACT_FILE = BASELINE_DIR / "route_migration_contract.json" -ROUTE_CONTRACT_VALIDATION_FILE = BASELINE_DIR / "route_migration_contract_validation.json" -WAVE_B_REWRITE_ENTRY_FILE = BASELINE_DIR / "wave-b-rewrite-entry-criteria.json" - -REQUIRED_ROUTES = { - "/wip-overview", - "/wip-detail", - "/hold-overview", - "/hold-detail", - "/hold-history", - "/resource", - "/resource-history", - "/qc-gate", - "/job-query", - "/excel-query", - "/query-tool", - "/tmtt-defect", -} - - -def _read_json(path: Path) -> dict: - return json.loads(path.read_text(encoding="utf-8")) - - -def test_route_migration_contract_has_no_validation_errors(): - contract = _read_json(ROUTE_CONTRACT_FILE) - errors = validate_route_migration_contract(contract, required_routes=REQUIRED_ROUTES) - assert errors == [] - - validation_payload = _read_json(ROUTE_CONTRACT_VALIDATION_FILE) - assert validation_payload["errors"] == [] - - -def test_wave_b_rewrite_entry_criteria_blocks_premature_native_cutover(): - contract = _read_json(ROUTE_CONTRACT_FILE) - rewrite_entry = _read_json(WAVE_B_REWRITE_ENTRY_FILE) - - # Current baseline has complete evidence for Wave B native routes. - assert validate_wave_b_rewrite_entry_criteria(contract, rewrite_entry) == [] - - # Simulate incomplete criteria while route already in native mode. - mutated_criteria = copy.deepcopy(rewrite_entry) - mutated_criteria["pages"]["/job-query"]["evidence"]["parity"] = "pending" - mutated_criteria["pages"]["/job-query"]["native_cutover_ready"] = False - mutated_criteria["pages"]["/job-query"]["block_reason"] = "pending parity" - - errors = validate_wave_b_rewrite_entry_criteria(contract, mutated_criteria) - assert "native cutover blocked for /job-query: rewrite criteria incomplete" in errors - - -def test_baseline_visibility_matches_current_registry_state(): - page_status = _read_json(PAGE_STATUS_FILE) - baseline = _read_json(BASELINE_VISIBILITY_FILE) - - assert baseline["admin"] == compute_drawer_visibility(page_status, is_admin=True) - assert baseline["non_admin"] == compute_drawer_visibility(page_status, is_admin=False) - - -def test_baseline_route_query_contract_covers_all_target_routes(): - baseline = _read_json(BASELINE_ROUTE_QUERY_FILE) - routes = baseline["routes"] - - assert set(routes.keys()) == REQUIRED_ROUTES - for route in REQUIRED_ROUTES: - assert "query_keys" in routes[route] - assert "render_mode" in routes[route] - assert routes[route]["render_mode"] in {"native", "wrapper"} - - -def test_interaction_evidence_contains_required_sections_for_all_routes(): - payload = _read_json(BASELINE_INTERACTION_FILE) - routes = payload["routes"] - - assert set(routes.keys()) == REQUIRED_ROUTES - for route in REQUIRED_ROUTES: - entry = routes[route] - assert "table" in entry - assert "chart" in entry - assert "filter" in entry - assert "matrix" in entry - - -def test_navigation_api_drawer_parity_matches_shell_baseline_for_admin_and_non_admin(): - app = create_app("testing") - app.config["TESTING"] = True - baseline = _read_json(BASELINE_VISIBILITY_FILE) - - non_admin_client = app.test_client() - non_admin_payload = _read_response_json(non_admin_client.get("/api/portal/navigation")) - assert _route_set(non_admin_payload["drawers"]) == _route_set(baseline["non_admin"]) - - admin_client = app.test_client() - with admin_client.session_transaction() as sess: - sess["admin"] = {"displayName": "Admin", "employeeNo": "A001"} - admin_payload = _read_response_json(admin_client.get("/api/portal/navigation")) - assert _route_set(admin_payload["drawers"]) == _route_set(baseline["admin"]) - - -def _read_response_json(response) -> dict: - return json.loads(response.data.decode("utf-8")) - - -def _route_set(drawers: list[dict]) -> set[str]: - return { - page["route"] - for drawer in drawers - for page in drawer.get("pages", []) - } diff --git a/tests/test_visual_regression_snapshots.py b/tests/test_visual_regression_snapshots.py deleted file mode 100644 index ad442d6..0000000 --- a/tests/test_visual_regression_snapshots.py +++ /dev/null @@ -1,60 +0,0 @@ -# -*- coding: utf-8 -*- -"""Visual regression snapshot contract checks for migration-critical states.""" - -from __future__ import annotations - -import hashlib -import json -from pathlib import Path - -import pytest - -ROOT = Path(__file__).resolve().parents[1] -BASELINE_DIR = ROOT / "docs" / "migration" / "portal-shell-route-view-integration" -SNAPSHOT_FILE = BASELINE_DIR / "visual-regression-snapshots.json" - -pytestmark = pytest.mark.skipif( - not BASELINE_DIR.exists(), - reason=f"Migration baseline directory missing: {BASELINE_DIR}", -) - - -def _read_json(path: Path) -> dict: - return json.loads(path.read_text(encoding="utf-8")) - - -def _sha256_text(text: str) -> str: - return hashlib.sha256(text.encode("utf-8")).hexdigest() - - -def _compute_fingerprint(files: list[str]) -> str: - lines: list[str] = [] - for rel in files: - path = ROOT / rel - assert path.exists(), f"snapshot file missing: {rel}" - digest = hashlib.sha256(path.read_bytes()).hexdigest() - lines.append(rel) - lines.append(digest) - payload = "\n".join(lines) + "\n" - return _sha256_text(payload) - - -def test_visual_snapshot_policy_blocks_release_on_critical_diff(): - payload = _read_json(SNAPSHOT_FILE) - policy = payload["critical_diff_policy"] - assert policy["block_release"] is True - assert policy["severity"] == "critical" - - -def test_visual_snapshot_fingerprints_match_current_sources(): - payload = _read_json(SNAPSHOT_FILE) - snapshots = payload.get("snapshots", []) - assert snapshots, "no visual snapshot entries" - - for item in snapshots: - files = item.get("files", []) - expected = str(item.get("fingerprint", "")).strip() - assert files and expected, f"invalid snapshot entry: {item.get('id')}" - - actual = _compute_fingerprint(files) - assert actual == expected, f"critical visual snapshot diff: {item.get('id')}" diff --git a/tests/test_wip_service.py b/tests/test_wip_service.py index fd29359..4580bb0 100644 --- a/tests/test_wip_service.py +++ b/tests/test_wip_service.py @@ -4,13 +4,13 @@ Tests the WIP query functions that use DW_MES_LOT_V view. """ -import unittest -from unittest.mock import patch, MagicMock -from functools import wraps -import pandas as pd -import threading -import time -from concurrent.futures import ThreadPoolExecutor +import unittest +from unittest.mock import patch, MagicMock +from functools import wraps +import pandas as pd +import threading +import time +from concurrent.futures import ThreadPoolExecutor from mes_dashboard.services.wip_service import ( WIP_VIEW, @@ -20,13 +20,13 @@ from mes_dashboard.services.wip_service import ( get_wip_detail, get_hold_detail_summary, get_hold_detail_lots, - get_hold_overview_treemap, - get_workcenters, - get_packages, - get_wip_filter_options, - search_workorders, - search_lot_ids, -) + get_hold_overview_treemap, + get_workcenters, + get_packages, + get_wip_filter_options, + search_workorders, + search_lot_ids, +) def disable_cache(func): @@ -257,7 +257,7 @@ class TestGetWorkcenters(unittest.TestCase): self.assertIsNone(result) -class TestGetPackages(unittest.TestCase): +class TestGetPackages(unittest.TestCase): """Test get_packages function.""" @disable_cache @@ -283,90 +283,90 @@ class TestGetPackages(unittest.TestCase): """Should return empty list when no packages.""" mock_read_sql.return_value = pd.DataFrame() - result = get_packages() - - self.assertEqual(result, []) - - -class TestGetWipFilterOptions(unittest.TestCase): - """Test get_wip_filter_options function.""" - - def setUp(self): - import mes_dashboard.services.wip_service as wip_service - with wip_service._wip_search_index_lock: - wip_service._wip_search_index_cache.clear() - with wip_service._wip_snapshot_lock: - wip_service._wip_snapshot_cache.clear() - - @patch('mes_dashboard.services.wip_service._get_wip_search_index') - def test_prefers_search_index_payload(self, mock_get_index): - mock_get_index.return_value = { - 'workorders': ['WO1', 'WO2'], - 'lotids': ['LOT1'], - 'packages': ['PKG1'], - 'types': ['TYPE1'], - 'firstnames': ['WF001'], - 'waferdescs': ['SiC'], - } - - result = get_wip_filter_options() - - self.assertEqual(result['workorders'], ['WO1', 'WO2']) - self.assertEqual(result['firstnames'], ['WF001']) - self.assertEqual(result['waferdescs'], ['SiC']) - - @patch('mes_dashboard.services.wip_service._get_wip_search_index', return_value=None) - @patch('mes_dashboard.services.wip_service._get_wip_dataframe') - def test_interdependent_options_follow_cross_filters(self, mock_cached_wip, _mock_get_index): - mock_cached_wip.return_value = pd.DataFrame({ - 'WORKORDER': ['WO1', 'WO1', 'WO2'], - 'LOTID': ['L1', 'L2', 'L3'], - 'PACKAGE_LEF': ['PKG-A', 'PKG-B', 'PKG-B'], - 'PJ_TYPE': ['TYPE-1', 'TYPE-1', 'TYPE-2'], - 'FIRSTNAME': ['WF-A', 'WF-B', 'WF-A'], - 'WAFERDESC': ['SiC', 'SiC', 'Si'], - 'EQUIPMENTCOUNT': [0, 1, 0], - 'CURRENTHOLDCOUNT': [1, 0, 0], - 'QTY': [10, 20, 30], - 'HOLDREASONNAME': ['Q-Check', None, None], - 'WORKCENTER_GROUP': ['WC-A', 'WC-A', 'WC-B'], - }) - - result = get_wip_filter_options(workorder='WO1') - - # Exclude-self semantics: workorder options still show values allowed by other filters. - self.assertEqual(result['workorders'], ['WO1', 'WO2']) - self.assertEqual(result['lotids'], ['L1', 'L2']) - self.assertEqual(result['types'], ['TYPE-1']) - self.assertEqual(result['waferdescs'], ['SiC']) - - @patch('mes_dashboard.services.wip_service._get_wip_search_index', return_value=None) - @patch('mes_dashboard.services.wip_service._select_with_snapshot_indexes') - @patch('mes_dashboard.services.wip_service._get_wip_dataframe') - def test_falls_back_to_cache_dataframe( - self, - mock_cached_wip, - mock_select_with_snapshot, - _mock_get_index, - ): - mock_cached_wip.return_value = pd.DataFrame({'WORKORDER': ['WO1']}) - mock_select_with_snapshot.return_value = pd.DataFrame({ - 'WORKORDER': ['WO2', 'WO1'], - 'LOTID': ['LOT2', 'LOT1'], - 'PACKAGE_LEF': ['PKG2', 'PKG1'], - 'PJ_TYPE': ['TYPE2', 'TYPE1'], - 'FIRSTNAME': ['WF002', 'WF001'], - 'WAFERDESC': ['Si', 'SiC'], - }) - - result = get_wip_filter_options() - - self.assertEqual(result['workorders'], ['WO1', 'WO2']) - self.assertEqual(result['firstnames'], ['WF001', 'WF002']) - self.assertEqual(result['waferdescs'], ['Si', 'SiC']) - - -class TestSearchWorkorders(unittest.TestCase): + result = get_packages() + + self.assertEqual(result, []) + + +class TestGetWipFilterOptions(unittest.TestCase): + """Test get_wip_filter_options function.""" + + def setUp(self): + import mes_dashboard.services.wip_service as wip_service + with wip_service._wip_search_index_lock: + wip_service._wip_search_index_cache.clear() + with wip_service._wip_snapshot_lock: + wip_service._wip_snapshot_cache.clear() + + @patch('mes_dashboard.services.wip_service._get_wip_search_index') + def test_prefers_search_index_payload(self, mock_get_index): + mock_get_index.return_value = { + 'workorders': ['WO1', 'WO2'], + 'lotids': ['LOT1'], + 'packages': ['PKG1'], + 'types': ['TYPE1'], + 'firstnames': ['WF001'], + 'waferdescs': ['SiC'], + } + + result = get_wip_filter_options() + + self.assertEqual(result['workorders'], ['WO1', 'WO2']) + self.assertEqual(result['firstnames'], ['WF001']) + self.assertEqual(result['waferdescs'], ['SiC']) + + @patch('mes_dashboard.services.wip_service._get_wip_search_index', return_value=None) + @patch('mes_dashboard.services.wip_service._get_wip_dataframe') + def test_interdependent_options_follow_cross_filters(self, mock_cached_wip, _mock_get_index): + mock_cached_wip.return_value = pd.DataFrame({ + 'WORKORDER': ['WO1', 'WO1', 'WO2'], + 'LOTID': ['L1', 'L2', 'L3'], + 'PACKAGE_LEF': ['PKG-A', 'PKG-B', 'PKG-B'], + 'PJ_TYPE': ['TYPE-1', 'TYPE-1', 'TYPE-2'], + 'FIRSTNAME': ['WF-A', 'WF-B', 'WF-A'], + 'WAFERDESC': ['SiC', 'SiC', 'Si'], + 'EQUIPMENTCOUNT': [0, 1, 0], + 'CURRENTHOLDCOUNT': [1, 0, 0], + 'QTY': [10, 20, 30], + 'HOLDREASONNAME': ['Q-Check', None, None], + 'WORKCENTER_GROUP': ['WC-A', 'WC-A', 'WC-B'], + }) + + result = get_wip_filter_options(workorder='WO1') + + # Exclude-self semantics: workorder options still show values allowed by other filters. + self.assertEqual(result['workorders'], ['WO1', 'WO2']) + self.assertEqual(result['lotids'], ['L1', 'L2']) + self.assertEqual(result['types'], ['TYPE-1']) + self.assertEqual(result['waferdescs'], ['SiC']) + + @patch('mes_dashboard.services.wip_service._get_wip_search_index', return_value=None) + @patch('mes_dashboard.services.wip_service._select_with_snapshot_indexes') + @patch('mes_dashboard.services.wip_service._get_wip_dataframe') + def test_falls_back_to_cache_dataframe( + self, + mock_cached_wip, + mock_select_with_snapshot, + _mock_get_index, + ): + mock_cached_wip.return_value = pd.DataFrame({'WORKORDER': ['WO1']}) + mock_select_with_snapshot.return_value = pd.DataFrame({ + 'WORKORDER': ['WO2', 'WO1'], + 'LOTID': ['LOT2', 'LOT1'], + 'PACKAGE_LEF': ['PKG2', 'PKG1'], + 'PJ_TYPE': ['TYPE2', 'TYPE1'], + 'FIRSTNAME': ['WF002', 'WF001'], + 'WAFERDESC': ['Si', 'SiC'], + }) + + result = get_wip_filter_options() + + self.assertEqual(result['workorders'], ['WO1', 'WO2']) + self.assertEqual(result['firstnames'], ['WF001', 'WF002']) + self.assertEqual(result['waferdescs'], ['Si', 'SiC']) + + +class TestSearchWorkorders(unittest.TestCase): """Test search_workorders function.""" @disable_cache @@ -534,7 +534,7 @@ class TestSearchLotIds(unittest.TestCase): self.assertIn("LOTID NOT LIKE '%DUMMY%'", call_args) -class TestWipSearchIndexShortcut(unittest.TestCase): +class TestWipSearchIndexShortcut(unittest.TestCase): """Test derived search index fast-path behavior.""" @patch('mes_dashboard.services.wip_service._search_workorders_from_oracle') @@ -559,72 +559,72 @@ class TestWipSearchIndexShortcut(unittest.TestCase): result = search_workorders("GA26", package="SOT-23") - self.assertEqual(result, ["GA26012001"]) - mock_oracle.assert_called_once() - - -class TestWipSnapshotLocking(unittest.TestCase): - """Concurrency behavior for snapshot cache build path.""" - - def setUp(self): - import mes_dashboard.services.wip_service as wip_service - with wip_service._wip_snapshot_lock: - wip_service._wip_snapshot_cache.clear() - - @staticmethod - def _sample_df() -> pd.DataFrame: - return pd.DataFrame({ - "WORKORDER": ["WO1", "WO2"], - "LOTID": ["LOT1", "LOT2"], - "QTY": [100, 200], - "EQUIPMENTCOUNT": [1, 0], - "CURRENTHOLDCOUNT": [0, 1], - "HOLDREASONNAME": [None, "品質確認"], - "WORKCENTER_GROUP": ["WC-A", "WC-B"], - "PACKAGE_LEF": ["PKG-A", "PKG-B"], - "PJ_TYPE": ["T1", "T2"], - }) - - def test_concurrent_snapshot_miss_builds_once(self): - import mes_dashboard.services.wip_service as wip_service - - df = self._sample_df() - build_count_lock = threading.Lock() - build_count = 0 - - def slow_build(snapshot_df, include_dummy, version): - nonlocal build_count - with build_count_lock: - build_count += 1 - time.sleep(0.05) - return { - "version": version, - "built_at": "2026-02-10T00:00:00", - "row_count": int(len(snapshot_df)), - "frame": snapshot_df, - "indexes": {}, - "frame_bytes": 0, - "index_bucket_count": 0, - } - - start_event = threading.Event() - - def call_snapshot(): - start_event.wait(timeout=1) - return wip_service._get_wip_snapshot(include_dummy=False) - - with patch.object(wip_service, "_get_wip_cache_version", return_value="version-1"): - with patch.object(wip_service, "_get_wip_dataframe", return_value=df) as mock_get_df: - with patch.object(wip_service, "_build_wip_snapshot", side_effect=slow_build): - with ThreadPoolExecutor(max_workers=6) as pool: - futures = [pool.submit(call_snapshot) for _ in range(6)] - start_event.set() - results = [future.result(timeout=3) for future in futures] - - self.assertEqual(build_count, 1) - self.assertEqual(mock_get_df.call_count, 1) - self.assertTrue(all(result is not None for result in results)) - self.assertTrue(all(result.get("version") == "version-1" for result in results)) + self.assertEqual(result, ["GA26012001"]) + mock_oracle.assert_called_once() + + +class TestWipSnapshotLocking(unittest.TestCase): + """Concurrency behavior for snapshot cache build path.""" + + def setUp(self): + import mes_dashboard.services.wip_service as wip_service + with wip_service._wip_snapshot_lock: + wip_service._wip_snapshot_cache.clear() + + @staticmethod + def _sample_df() -> pd.DataFrame: + return pd.DataFrame({ + "WORKORDER": ["WO1", "WO2"], + "LOTID": ["LOT1", "LOT2"], + "QTY": [100, 200], + "EQUIPMENTCOUNT": [1, 0], + "CURRENTHOLDCOUNT": [0, 1], + "HOLDREASONNAME": [None, "品質確認"], + "WORKCENTER_GROUP": ["WC-A", "WC-B"], + "PACKAGE_LEF": ["PKG-A", "PKG-B"], + "PJ_TYPE": ["T1", "T2"], + }) + + def test_concurrent_snapshot_miss_builds_once(self): + import mes_dashboard.services.wip_service as wip_service + + df = self._sample_df() + build_count_lock = threading.Lock() + build_count = 0 + + def slow_build(snapshot_df, include_dummy, version): + nonlocal build_count + with build_count_lock: + build_count += 1 + time.sleep(0.05) + return { + "version": version, + "built_at": "2026-02-10T00:00:00", + "row_count": int(len(snapshot_df)), + "frame": snapshot_df, + "indexes": {}, + "frame_bytes": 0, + "index_bucket_count": 0, + } + + start_event = threading.Event() + + def call_snapshot(): + start_event.wait(timeout=1) + return wip_service._get_wip_snapshot(include_dummy=False) + + with patch.object(wip_service, "_get_wip_cache_version", return_value="version-1"): + with patch.object(wip_service, "_get_wip_dataframe", return_value=df) as mock_get_df: + with patch.object(wip_service, "_build_wip_snapshot", side_effect=slow_build): + with ThreadPoolExecutor(max_workers=6) as pool: + futures = [pool.submit(call_snapshot) for _ in range(6)] + start_event.set() + results = [future.result(timeout=3) for future in futures] + + self.assertEqual(build_count, 1) + self.assertEqual(mock_get_df.call_count, 1) + self.assertTrue(all(result is not None for result in results)) + self.assertTrue(all(result.get("version") == "version-1" for result in results)) class TestDummyExclusionInAllFunctions(unittest.TestCase): @@ -767,9 +767,9 @@ class TestMultipleFilterConditions(unittest.TestCase): sql = call_args[0][0] params = call_args[0][1] if len(call_args[0]) > 1 else {} - self.assertIn("WORKORDER", sql) - self.assertIn("LOTID", sql) - self.assertIn("LIKE", sql) + self.assertIn("WORKORDER", sql) + self.assertIn("LOTID", sql) + self.assertIn("LIKE", sql) self.assertIn("LOTID NOT LIKE '%DUMMY%'", sql) # Verify params contain the search patterns self.assertTrue(any('%GA26%' in str(v) for v in params.values())) @@ -794,9 +794,9 @@ class TestMultipleFilterConditions(unittest.TestCase): sql = call_args[0][0] params = call_args[0][1] if len(call_args[0]) > 1 else {} - self.assertIn("WORKORDER", sql) - self.assertIn("LOTID", sql) - self.assertIn("LIKE", sql) + self.assertIn("WORKORDER", sql) + self.assertIn("LOTID", sql) + self.assertIn("LIKE", sql) # Should NOT contain DUMMY exclusion since include_dummy=True self.assertNotIn("LOTID NOT LIKE '%DUMMY%'", sql) # Verify params contain the search patterns @@ -963,6 +963,14 @@ class TestWipServiceIntegration: python -m pytest tests/test_wip_service.py -k Integration --run-integration """ + @pytest.fixture(autouse=True) + def _reset_db_state(self): + """Reset DB engine and circuit breaker so integration tests start clean.""" + import mes_dashboard.core.database as _db + from mes_dashboard.core.circuit_breaker import get_database_circuit_breaker + _db._ENGINE = None + get_database_circuit_breaker().reset() + @pytest.mark.integration def test_get_wip_summary_integration(self): """Integration test for get_wip_summary."""