feat: complete dashboard-vite parity and fix portal health/csp regressions

This commit is contained in:
egg
2026-02-09 09:22:23 +08:00
parent cf194bc3a3
commit 1e6d6dbd31
57 changed files with 13347 additions and 312 deletions

View File

@@ -1,15 +1,27 @@
# -*- coding: utf-8 -*-
"""Pytest configuration and fixtures for MES Dashboard tests."""
import pytest
import sys
import os
# Add the src directory to Python path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src'))
import mes_dashboard.core.database as db
from mes_dashboard.app import create_app
import pytest
import sys
import os
# Add the src directory to Python path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src'))
_PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
_TMP_DIR = os.path.join(_PROJECT_ROOT, 'tmp')
# Test baseline env: keep pytest isolated from local runtime/.env side effects.
os.environ.setdefault('FLASK_ENV', 'testing')
os.environ.setdefault('REDIS_ENABLED', 'false')
os.environ.setdefault('RUNTIME_CONTRACT_ENFORCE', 'false')
os.environ.setdefault('SLOW_QUERY_THRESHOLD', '1.0')
os.environ.setdefault('WATCHDOG_RUNTIME_DIR', _TMP_DIR)
os.environ.setdefault('WATCHDOG_RESTART_FLAG', os.path.join(_TMP_DIR, 'mes_dashboard_restart.flag'))
os.environ.setdefault('WATCHDOG_PID_FILE', os.path.join(_TMP_DIR, 'gunicorn.pid'))
os.environ.setdefault('WATCHDOG_STATE_FILE', os.path.join(_TMP_DIR, 'mes_dashboard_restart_state.json'))
import mes_dashboard.core.database as db
from mes_dashboard.app import create_app
@pytest.fixture

View File

@@ -5,19 +5,18 @@ These tests simulate real user workflows through the admin authentication system
Run with: pytest tests/e2e/test_admin_auth_e2e.py -v --run-integration
"""
import json
import pytest
from unittest.mock import patch, MagicMock
import tempfile
from pathlib import Path
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', 'src'))
import mes_dashboard.core.database as db
from mes_dashboard.app import create_app
from mes_dashboard.services import page_registry
import json
import pytest
from unittest.mock import patch
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', 'src'))
import mes_dashboard.core.database as db
from mes_dashboard.app import create_app
from mes_dashboard.services import page_registry
from mes_dashboard.routes import auth_routes
@pytest.fixture
@@ -39,54 +38,57 @@ def temp_page_status(tmp_path):
@pytest.fixture
def app(temp_page_status):
"""Create application for testing."""
db._ENGINE = None
def app(temp_page_status):
"""Create application for testing."""
db._ENGINE = None
# Mock page registry
original_data_file = page_registry.DATA_FILE
original_cache = page_registry._cache
page_registry.DATA_FILE = temp_page_status
page_registry._cache = None
app = create_app('testing')
app.config['TESTING'] = True
app.config['WTF_CSRF_ENABLED'] = False
yield app
page_registry.DATA_FILE = original_data_file
page_registry._cache = original_cache
app = create_app('testing')
app.config['TESTING'] = True
app.config['CSRF_ENABLED'] = False
yield app
page_registry.DATA_FILE = original_data_file
page_registry._cache = original_cache
@pytest.fixture
def client(app):
"""Create test client."""
return app.test_client()
def client(app):
"""Create test client."""
return app.test_client()
@pytest.fixture(autouse=True)
def clear_login_rate_limit():
"""Reset in-memory login attempts to avoid cross-test interference."""
auth_routes._login_attempts.clear()
yield
auth_routes._login_attempts.clear()
def _mock_admin_user(mail: str = "ymirliu@panjit.com.tw") -> dict:
return {
"username": "92367",
"displayName": "Test Admin",
"mail": mail,
"department": "Test Department",
}
def mock_ldap_success(mail="ymirliu@panjit.com.tw"):
"""Helper to create mock for successful LDAP auth."""
mock_response = MagicMock()
mock_response.json.return_value = {
"success": True,
"user": {
"username": "92367",
"displayName": "Test Admin",
"mail": mail,
"department": "Test Department"
}
}
return mock_response
class TestFullLoginLogoutFlow:
"""E2E tests for complete login/logout flow."""
@patch('mes_dashboard.services.auth_service.requests.post')
def test_complete_admin_login_workflow(self, mock_post, client):
"""Test complete admin login workflow."""
mock_post.return_value = mock_ldap_success()
class TestFullLoginLogoutFlow:
"""E2E tests for complete login/logout flow."""
@patch('mes_dashboard.routes.auth_routes.is_admin', return_value=True)
@patch('mes_dashboard.routes.auth_routes.authenticate')
def test_complete_admin_login_workflow(self, mock_auth, _mock_is_admin, client):
"""Test complete admin login workflow."""
mock_auth.return_value = _mock_admin_user()
# 1. Access portal - should see login link
response = client.get("/")
@@ -131,7 +133,7 @@ class TestFullLoginLogoutFlow:
assert response.status_code == 302
class TestPageAccessControlFlow:
class TestPageAccessControlFlow:
"""E2E tests for page access control flow."""
def test_non_admin_cannot_access_dev_pages(self, client, temp_page_status):
@@ -146,10 +148,11 @@ class TestPageAccessControlFlow:
content = response.data.decode("utf-8")
assert "開發中" in content or "403" in content
@patch('mes_dashboard.services.auth_service.requests.post')
def test_admin_can_access_all_pages(self, mock_post, client, temp_page_status):
"""Test admin users can access all pages."""
mock_post.return_value = mock_ldap_success()
@patch('mes_dashboard.routes.auth_routes.is_admin', return_value=True)
@patch('mes_dashboard.routes.auth_routes.authenticate')
def test_admin_can_access_all_pages(self, mock_auth, _mock_is_admin, client, temp_page_status):
"""Test admin users can access all pages."""
mock_auth.return_value = _mock_admin_user()
# 1. Login as admin
client.post("/admin/login", data={
@@ -166,13 +169,14 @@ class TestPageAccessControlFlow:
assert response.status_code != 403
class TestPageManagementFlow:
class TestPageManagementFlow:
"""E2E tests for page management flow."""
@patch('mes_dashboard.services.auth_service.requests.post')
def test_admin_can_change_page_status(self, mock_post, client, temp_page_status):
"""Test admin can change page status via management interface."""
mock_post.return_value = mock_ldap_success()
@patch('mes_dashboard.routes.auth_routes.is_admin', return_value=True)
@patch('mes_dashboard.routes.auth_routes.authenticate')
def test_admin_can_change_page_status(self, mock_auth, _mock_is_admin, client, temp_page_status):
"""Test admin can change page status via management interface."""
mock_auth.return_value = _mock_admin_user()
# 1. Login as admin
client.post("/admin/login", data={
@@ -206,10 +210,11 @@ class TestPageManagementFlow:
response = client.get("/wip-overview")
assert response.status_code == 403
@patch('mes_dashboard.services.auth_service.requests.post')
def test_release_dev_page_makes_it_public(self, mock_post, client, temp_page_status):
"""Test releasing a dev page makes it publicly accessible."""
mock_post.return_value = mock_ldap_success()
@patch('mes_dashboard.routes.auth_routes.is_admin', return_value=True)
@patch('mes_dashboard.routes.auth_routes.authenticate')
def test_release_dev_page_makes_it_public(self, mock_auth, _mock_is_admin, client, temp_page_status):
"""Test releasing a dev page makes it publicly accessible."""
mock_auth.return_value = _mock_admin_user()
# 1. Verify /tables is currently dev (403 for non-admin)
response = client.get("/tables")
@@ -238,7 +243,7 @@ class TestPageManagementFlow:
assert response.status_code != 403
class TestPortalDynamicTabs:
class TestPortalDynamicTabs:
"""E2E tests for dynamic portal tabs based on page status."""
def test_portal_hides_dev_tabs_for_non_admin(self, client, temp_page_status):
@@ -253,10 +258,11 @@ class TestPortalDynamicTabs:
# Dev pages should NOT show (tables and resource are dev)
# Note: This depends on the can_view_page implementation in portal.html
@patch('mes_dashboard.services.auth_service.requests.post')
def test_portal_shows_all_tabs_for_admin(self, mock_post, client, temp_page_status):
"""Test portal shows all tabs for admin users."""
mock_post.return_value = mock_ldap_success()
@patch('mes_dashboard.routes.auth_routes.is_admin', return_value=True)
@patch('mes_dashboard.routes.auth_routes.authenticate')
def test_portal_shows_all_tabs_for_admin(self, mock_auth, _mock_is_admin, client, temp_page_status):
"""Test portal shows all tabs for admin users."""
mock_auth.return_value = _mock_admin_user()
# Login as admin
client.post("/admin/login", data={
@@ -272,13 +278,14 @@ class TestPortalDynamicTabs:
assert "WIP 即時概況" in content
class TestSessionPersistence:
class TestSessionPersistence:
"""E2E tests for session persistence."""
@patch('mes_dashboard.services.auth_service.requests.post')
def test_session_persists_across_requests(self, mock_post, client):
"""Test admin session persists across multiple requests."""
mock_post.return_value = mock_ldap_success()
@patch('mes_dashboard.routes.auth_routes.is_admin', return_value=True)
@patch('mes_dashboard.routes.auth_routes.authenticate')
def test_session_persists_across_requests(self, mock_auth, _mock_is_admin, client):
"""Test admin session persists across multiple requests."""
mock_auth.return_value = _mock_admin_user()
# Login
client.post("/admin/login", data={
@@ -296,39 +303,34 @@ class TestSessionPersistence:
assert "admin" in sess
class TestSecurityScenarios:
class TestSecurityScenarios:
"""E2E tests for security scenarios."""
def test_cannot_access_admin_api_without_login(self, client):
"""Test admin APIs are protected."""
# Try to get pages without login
response = client.get("/admin/api/pages", follow_redirects=False)
assert response.status_code == 302
# Try to update page without login
response = client.put(
"/admin/api/pages/wip-overview",
data=json.dumps({"status": "dev"}),
content_type="application/json",
follow_redirects=False
)
assert response.status_code == 302
@patch('mes_dashboard.services.auth_service.requests.post')
def test_non_admin_user_cannot_login(self, mock_post, client):
"""Test non-admin user cannot access admin features."""
# Mock LDAP success but with non-admin email
mock_response = MagicMock()
mock_response.json.return_value = {
"success": True,
"user": {
"username": "99999",
"displayName": "Regular User",
"mail": "regular@panjit.com.tw",
"department": "Test"
}
}
mock_post.return_value = mock_response
def test_cannot_access_admin_api_without_login(self, client):
"""Test admin APIs are protected."""
# Try to get pages without login
response = client.get("/admin/api/pages", follow_redirects=False)
assert response.status_code in (302, 401)
# Try to update page without login
response = client.put(
"/admin/api/pages/wip-overview",
data=json.dumps({"status": "dev"}),
content_type="application/json",
follow_redirects=False
)
assert response.status_code in (302, 401)
@patch('mes_dashboard.routes.auth_routes.is_admin', return_value=False)
@patch('mes_dashboard.routes.auth_routes.authenticate')
def test_non_admin_user_cannot_login(self, mock_auth, _mock_is_admin, client):
"""Test non-admin user cannot access admin features."""
mock_auth.return_value = {
"username": "99999",
"displayName": "Regular User",
"mail": "regular@panjit.com.tw",
"department": "Test",
}
# Try to login
response = client.post("/admin/login", data={

View File

@@ -196,11 +196,11 @@ class TestSearchEndpointsE2E:
def test_search_workorders(self, api_base_url):
"""Test workorder search returns results."""
# Use a common pattern that should exist
response = requests.get(
f"{api_base_url}/wip/meta/search",
params={'type': 'workorder', 'q': 'WO', 'limit': 10},
timeout=30
)
response = requests.get(
f"{api_base_url}/wip/meta/search",
params={'field': 'workorder', 'q': 'WO', 'limit': 10},
timeout=30
)
assert response.status_code == 200
data = self._unwrap(response.json())
@@ -208,11 +208,11 @@ class TestSearchEndpointsE2E:
def test_search_lotids(self, api_base_url):
"""Test lot ID search returns results."""
response = requests.get(
f"{api_base_url}/wip/meta/search",
params={'type': 'lotid', 'q': 'LOT', 'limit': 10},
timeout=30
)
response = requests.get(
f"{api_base_url}/wip/meta/search",
params={'field': 'lotid', 'q': 'LOT', 'limit': 10},
timeout=30
)
assert response.status_code == 200
data = self._unwrap(response.json())
@@ -220,11 +220,11 @@ class TestSearchEndpointsE2E:
def test_search_with_short_query_returns_empty(self, api_base_url):
"""Test search with short query returns empty list."""
response = requests.get(
f"{api_base_url}/wip/meta/search",
params={'type': 'workorder', 'q': 'W'}, # Too short
timeout=30
)
response = requests.get(
f"{api_base_url}/wip/meta/search",
params={'field': 'workorder', 'q': 'W'}, # Too short
timeout=30
)
assert response.status_code == 200
data = self._unwrap(response.json())

View File

@@ -23,25 +23,36 @@ class TestPortalPage:
# Wait for page to load
expect(page.locator('h1')).to_contain_text('MES 報表入口')
def test_portal_has_all_tabs(self, page: Page, app_server: str):
"""Portal should have all navigation tabs."""
page.goto(app_server)
def test_portal_has_all_tabs(self, page: Page, app_server: str):
"""Portal should have all navigation tabs."""
page.goto(app_server)
# Check released tabs exist
expect(page.locator('.tab:has-text("WIP 即時概況")')).to_be_visible()
expect(page.locator('.tab:has-text("設備即時概況")')).to_be_visible()
expect(page.locator('.tab:has-text("設備歷史績效")')).to_be_visible()
expect(page.locator('.tab:has-text("設備維修查詢")')).to_be_visible()
expect(page.locator('.tab:has-text("批次追蹤工具")')).to_be_visible()
# Check all tabs exist
expect(page.locator('.tab:has-text("WIP 即時概況")')).to_be_visible()
expect(page.locator('.tab:has-text("機台狀態報表")')).to_be_visible()
expect(page.locator('.tab:has-text("數據表查詢工具")')).to_be_visible()
expect(page.locator('.tab:has-text("Excel 批次查詢")')).to_be_visible()
def test_portal_tab_switching(self, page: Page, app_server: str):
"""Portal tabs should switch iframe content."""
page.goto(app_server)
# Click on a different tab
page.locator('.tab:has-text("機台狀態報表")').click()
# Verify the tab is active
expect(page.locator('.tab:has-text("機台狀態報表")')).to_have_class(re.compile(r'active'))
def test_portal_tab_switching(self, page: Page, app_server: str):
"""Portal tabs should switch iframe content."""
page.goto(app_server)
# Click on a different tab
page.locator('.tab:has-text("設備即時概況")').click()
# Verify the tab is active
expect(page.locator('.tab:has-text("設備即時概況")')).to_have_class(re.compile(r'active'))
def test_portal_health_popup_clickable(self, page: Page, app_server: str):
"""Health status pill should toggle popup visibility on click."""
page.goto(app_server)
popup = page.locator('#healthPopup')
expect(popup).not_to_have_class(re.compile(r'show'))
page.locator('#healthStatus').click()
expect(popup).to_have_class(re.compile(r'show'))
@pytest.mark.e2e
@@ -240,11 +251,16 @@ class TestWIPDetailPage:
class TestTablesPage:
"""E2E tests for Tables page."""
def test_tables_page_loads(self, page: Page, app_server: str):
"""Tables page should load."""
page.goto(f"{app_server}/tables")
expect(page.locator('h1')).to_contain_text('MES 數據表查詢工具')
def test_tables_page_loads(self, page: Page, app_server: str):
"""Tables page should load."""
page.goto(f"{app_server}/tables")
header = page.locator('h1')
expect(header).to_be_visible()
text = header.inner_text()
assert (
'MES 數據表查詢工具' in text
or '頁面開發中' in text
)
def test_tables_has_toast_system(self, page: Page, app_server: str):
"""Tables page should have Toast system loaded."""

View File

@@ -5,11 +5,11 @@ These tests simulate real user workflows through the resource history analysis f
Run with: pytest tests/e2e/test_resource_history_e2e.py -v --run-integration
"""
import json
import pytest
from unittest.mock import patch, MagicMock
import pandas as pd
from datetime import datetime, timedelta
import json
import pytest
from unittest.mock import patch
import pandas as pd
from datetime import datetime
import sys
import os
@@ -94,19 +94,20 @@ class TestResourceHistoryPageAccess:
assert 'exportBtn' in content
class TestResourceHistoryAPIWorkflow:
"""E2E tests for API workflows."""
@patch('mes_dashboard.services.filter_cache.get_workcenter_groups')
@patch('mes_dashboard.services.filter_cache.get_resource_families')
def test_filter_options_workflow(self, mock_families, mock_groups, client):
"""Filter options should be loadable."""
mock_groups.return_value = [
{'name': '焊接_DB', 'sequence': 1},
{'name': '焊接_WB', 'sequence': 2},
{'name': '成型', 'sequence': 4},
]
mock_families.return_value = ['FAM001', 'FAM002']
class TestResourceHistoryAPIWorkflow:
"""E2E tests for API workflows."""
@patch('mes_dashboard.services.resource_history_service.get_filter_options')
def test_filter_options_workflow(self, mock_get_filter_options, client):
"""Filter options should be loadable."""
mock_get_filter_options.return_value = {
'workcenter_groups': [
{'name': '焊接_DB', 'sequence': 1},
{'name': '焊接_WB', 'sequence': 2},
{'name': '成型', 'sequence': 4},
],
'families': ['FAM001', 'FAM002'],
}
response = client.get('/api/resource/history/options')
@@ -116,15 +117,31 @@ class TestResourceHistoryAPIWorkflow:
assert 'workcenter_groups' in data['data']
assert 'families' in data['data']
@patch('mes_dashboard.services.resource_history_service.read_sql_df')
def test_complete_query_workflow(self, mock_read_sql, client):
"""Complete query workflow should return all data sections."""
# Mock responses for the 4 queries in query_summary
kpi_df = pd.DataFrame([{
'PRD_HOURS': 8000, 'SBY_HOURS': 1000, 'UDT_HOURS': 500,
'SDT_HOURS': 300, 'EGT_HOURS': 200, 'NST_HOURS': 1000,
'MACHINE_COUNT': 100
}])
@patch('mes_dashboard.services.resource_history_service._get_filtered_resources')
@patch('mes_dashboard.services.resource_history_service.read_sql_df')
def test_complete_query_workflow(self, mock_read_sql, mock_resources, client):
"""Complete query workflow should return all data sections."""
mock_resources.return_value = [
{
'RESOURCEID': 'RES001',
'WORKCENTERNAME': '焊接_DB',
'RESOURCEFAMILYNAME': 'FAM001',
'RESOURCENAME': 'RES001',
},
{
'RESOURCEID': 'RES002',
'WORKCENTERNAME': '成型',
'RESOURCEFAMILYNAME': 'FAM002',
'RESOURCENAME': 'RES002',
},
]
# Mock responses for the 3 queries in query_summary
kpi_df = pd.DataFrame([{
'PRD_HOURS': 8000, 'SBY_HOURS': 1000, 'UDT_HOURS': 500,
'SDT_HOURS': 300, 'EGT_HOURS': 200, 'NST_HOURS': 1000,
'MACHINE_COUNT': 100
}])
trend_df = pd.DataFrame([
{'DATA_DATE': datetime(2024, 1, 1), 'PRD_HOURS': 1000, 'SBY_HOURS': 100,
@@ -133,33 +150,24 @@ class TestResourceHistoryAPIWorkflow:
'UDT_HOURS': 40, 'SDT_HOURS': 25, 'EGT_HOURS': 15, 'NST_HOURS': 100, 'MACHINE_COUNT': 100},
])
heatmap_df = pd.DataFrame([
{'WORKCENTERNAME': '焊接_DB', 'DATA_DATE': datetime(2024, 1, 1),
'PRD_HOURS': 400, 'SBY_HOURS': 50, 'UDT_HOURS': 25, 'SDT_HOURS': 15, 'EGT_HOURS': 10},
{'WORKCENTERNAME': '成型', 'DATA_DATE': datetime(2024, 1, 1),
'PRD_HOURS': 600, 'SBY_HOURS': 50, 'UDT_HOURS': 25, 'SDT_HOURS': 15, 'EGT_HOURS': 10},
])
comparison_df = pd.DataFrame([
{'WORKCENTERNAME': '焊接_DB', 'PRD_HOURS': 4000, 'SBY_HOURS': 500,
'UDT_HOURS': 250, 'SDT_HOURS': 150, 'EGT_HOURS': 100, 'MACHINE_COUNT': 50},
{'WORKCENTERNAME': '成型', 'PRD_HOURS': 4000, 'SBY_HOURS': 500,
'UDT_HOURS': 250, 'SDT_HOURS': 150, 'EGT_HOURS': 100, 'MACHINE_COUNT': 50},
])
# Use function-based side_effect for ThreadPoolExecutor parallel queries
def mock_sql(sql):
sql_upper = sql.upper()
if 'DATA_DATE' in sql_upper and 'WORKCENTERNAME' in sql_upper:
return heatmap_df
elif 'DATA_DATE' in sql_upper:
return trend_df
elif 'WORKCENTERNAME' in sql_upper:
return comparison_df
else:
return kpi_df
mock_read_sql.side_effect = mock_sql
heatmap_raw_df = pd.DataFrame([
{'HISTORYID': 'RES001', 'DATA_DATE': datetime(2024, 1, 1),
'PRD_HOURS': 400, 'SBY_HOURS': 50, 'UDT_HOURS': 25, 'SDT_HOURS': 15, 'EGT_HOURS': 10, 'NST_HOURS': 20},
{'HISTORYID': 'RES002', 'DATA_DATE': datetime(2024, 1, 1),
'PRD_HOURS': 600, 'SBY_HOURS': 50, 'UDT_HOURS': 25, 'SDT_HOURS': 15, 'EGT_HOURS': 10, 'NST_HOURS': 30},
])
# Use function-based side_effect for ThreadPoolExecutor parallel queries
def mock_sql(sql, _params=None):
sql_upper = sql.upper()
if 'HISTORYID' in sql_upper and 'DATA_DATE' in sql_upper:
return heatmap_raw_df
elif 'DATA_DATE' in sql_upper:
return trend_df
else:
return kpi_df
mock_read_sql.side_effect = mock_sql
response = client.get(
'/api/resource/history/summary'
@@ -183,23 +191,39 @@ class TestResourceHistoryAPIWorkflow:
# Trend should also have availability_pct
assert 'availability_pct' in data['data']['trend'][0]
# Verify heatmap
assert len(data['data']['heatmap']) == 2
# Verify comparison
assert len(data['data']['workcenter_comparison']) == 2
@patch('mes_dashboard.services.resource_history_service.read_sql_df')
def test_detail_query_workflow(self, mock_read_sql, client):
"""Detail query workflow should return hierarchical data."""
detail_df = pd.DataFrame([
{'WORKCENTERNAME': '焊接_DB', 'RESOURCEFAMILYNAME': 'FAM001', 'RESOURCENAME': 'RES001',
'PRD_HOURS': 80, 'SBY_HOURS': 10, 'UDT_HOURS': 5, 'SDT_HOURS': 3, 'EGT_HOURS': 2,
'NST_HOURS': 10, 'TOTAL_HOURS': 110},
{'WORKCENTERNAME': '焊接_DB', 'RESOURCEFAMILYNAME': 'FAM001', 'RESOURCENAME': 'RES002',
'PRD_HOURS': 75, 'SBY_HOURS': 15, 'UDT_HOURS': 5, 'SDT_HOURS': 3, 'EGT_HOURS': 2,
'NST_HOURS': 10, 'TOTAL_HOURS': 110},
])
# Verify heatmap
assert len(data['data']['heatmap']) == 2
# Verify comparison
assert len(data['data']['workcenter_comparison']) == 2
@patch('mes_dashboard.services.resource_history_service._get_filtered_resources')
@patch('mes_dashboard.services.resource_history_service.read_sql_df')
def test_detail_query_workflow(self, mock_read_sql, mock_resources, client):
"""Detail query workflow should return hierarchical data."""
mock_resources.return_value = [
{
'RESOURCEID': 'RES001',
'WORKCENTERNAME': '焊接_DB',
'RESOURCEFAMILYNAME': 'FAM001',
'RESOURCENAME': 'RES001',
},
{
'RESOURCEID': 'RES002',
'WORKCENTERNAME': '焊接_DB',
'RESOURCEFAMILYNAME': 'FAM001',
'RESOURCENAME': 'RES002',
},
]
detail_df = pd.DataFrame([
{'HISTORYID': 'RES001',
'PRD_HOURS': 80, 'SBY_HOURS': 10, 'UDT_HOURS': 5, 'SDT_HOURS': 3, 'EGT_HOURS': 2,
'NST_HOURS': 10, 'TOTAL_HOURS': 110},
{'HISTORYID': 'RES002',
'PRD_HOURS': 75, 'SBY_HOURS': 15, 'UDT_HOURS': 5, 'SDT_HOURS': 3, 'EGT_HOURS': 2,
'NST_HOURS': 10, 'TOTAL_HOURS': 110},
])
mock_read_sql.return_value = detail_df
@@ -226,14 +250,23 @@ class TestResourceHistoryAPIWorkflow:
assert 'prd_hours' in first_row
assert 'prd_pct' in first_row
@patch('mes_dashboard.services.resource_history_service.read_sql_df')
def test_export_workflow(self, mock_read_sql, client):
"""Export workflow should return valid CSV."""
mock_read_sql.return_value = pd.DataFrame([
{'WORKCENTERNAME': '焊接_DB', 'RESOURCEFAMILYNAME': 'FAM001', 'RESOURCENAME': 'RES001',
'PRD_HOURS': 80, 'SBY_HOURS': 10, 'UDT_HOURS': 5, 'SDT_HOURS': 3, 'EGT_HOURS': 2,
'NST_HOURS': 10, 'TOTAL_HOURS': 110},
])
@patch('mes_dashboard.services.resource_history_service._get_filtered_resources')
@patch('mes_dashboard.services.resource_history_service.read_sql_df')
def test_export_workflow(self, mock_read_sql, mock_resources, client):
"""Export workflow should return valid CSV."""
mock_resources.return_value = [
{
'RESOURCEID': 'RES001',
'WORKCENTERNAME': '焊接_DB',
'RESOURCEFAMILYNAME': 'FAM001',
'RESOURCENAME': 'RES001',
}
]
mock_read_sql.return_value = pd.DataFrame([
{'HISTORYID': 'RES001',
'PRD_HOURS': 80, 'SBY_HOURS': 10, 'UDT_HOURS': 5, 'SDT_HOURS': 3, 'EGT_HOURS': 2,
'NST_HOURS': 10, 'TOTAL_HOURS': 110},
])
response = client.get(
'/api/resource/history/export'
@@ -281,21 +314,47 @@ class TestResourceHistoryValidation:
data = json.loads(response.data)
assert data['success'] is False
@patch('mes_dashboard.services.resource_history_service.read_sql_df')
def test_granularity_options(self, mock_read_sql, client):
"""Different granularity options should work."""
mock_df = pd.DataFrame([{
'PRD_HOURS': 100, 'SBY_HOURS': 10, 'UDT_HOURS': 5,
'SDT_HOURS': 3, 'EGT_HOURS': 2, 'NST_HOURS': 10, 'MACHINE_COUNT': 5
}])
mock_read_sql.return_value = mock_df
for granularity in ['day', 'week', 'month', 'year']:
mock_read_sql.side_effect = [mock_df, pd.DataFrame(), pd.DataFrame(), pd.DataFrame()]
response = client.get(
f'/api/resource/history/summary'
f'?start_date=2024-01-01'
@patch('mes_dashboard.services.resource_history_service._get_filtered_resources')
@patch('mes_dashboard.services.resource_history_service.read_sql_df')
def test_granularity_options(self, mock_read_sql, mock_resources, client):
"""Different granularity options should work."""
mock_resources.return_value = [{
'RESOURCEID': 'RES001',
'WORKCENTERNAME': '焊接_DB',
'RESOURCEFAMILYNAME': 'FAM001',
'RESOURCENAME': 'RES001',
}]
kpi_df = pd.DataFrame([{
'PRD_HOURS': 100, 'SBY_HOURS': 10, 'UDT_HOURS': 5,
'SDT_HOURS': 3, 'EGT_HOURS': 2, 'NST_HOURS': 10, 'MACHINE_COUNT': 5
}])
trend_df = pd.DataFrame([{
'DATA_DATE': datetime(2024, 1, 1),
'PRD_HOURS': 100, 'SBY_HOURS': 10, 'UDT_HOURS': 5,
'SDT_HOURS': 3, 'EGT_HOURS': 2, 'NST_HOURS': 10,
'MACHINE_COUNT': 5
}])
heatmap_raw_df = pd.DataFrame([{
'HISTORYID': 'RES001',
'DATA_DATE': datetime(2024, 1, 1),
'PRD_HOURS': 100, 'SBY_HOURS': 10, 'UDT_HOURS': 5,
'SDT_HOURS': 3, 'EGT_HOURS': 2, 'NST_HOURS': 10
}])
for granularity in ['day', 'week', 'month', 'year']:
def mock_sql(sql, _params=None):
sql_upper = sql.upper()
if 'HISTORYID' in sql_upper and 'DATA_DATE' in sql_upper:
return heatmap_raw_df
if 'DATA_DATE' in sql_upper:
return trend_df
return kpi_df
mock_read_sql.side_effect = mock_sql
response = client.get(
f'/api/resource/history/summary'
f'?start_date=2024-01-01'
f'&end_date=2024-01-31'
f'&granularity={granularity}'
)

View File

@@ -10,7 +10,7 @@ class AppFactoryTests(unittest.TestCase):
db._ENGINE = None
def test_create_app_default_config(self):
app = create_app()
app = create_app("development")
self.assertTrue(app.config.get("DEBUG"))
self.assertEqual(app.config.get("ENV"), "development")
cache = app.extensions.get("cache")
@@ -20,8 +20,11 @@ class AppFactoryTests(unittest.TestCase):
def test_create_app_production_config(self):
old_secret = os.environ.get("SECRET_KEY")
old_conda_env_name = os.environ.get("CONDA_ENV_NAME")
try:
os.environ["SECRET_KEY"] = "test-production-secret-key"
# Keep runtime-contract strict validation aligned with active env.
os.environ["CONDA_ENV_NAME"] = os.environ.get("CONDA_DEFAULT_ENV", "base")
app = create_app("production")
self.assertFalse(app.config.get("DEBUG"))
self.assertEqual(app.config.get("ENV"), "production")
@@ -30,15 +33,19 @@ class AppFactoryTests(unittest.TestCase):
os.environ.pop("SECRET_KEY", None)
else:
os.environ["SECRET_KEY"] = old_secret
if old_conda_env_name is None:
os.environ.pop("CONDA_ENV_NAME", None)
else:
os.environ["CONDA_ENV_NAME"] = old_conda_env_name
def test_create_app_independent_instances(self):
app1 = create_app()
app1 = create_app("development")
db._ENGINE = None
app2 = create_app()
app2 = create_app("development")
self.assertIsNot(app1, app2)
def test_routes_registered(self):
app = create_app()
app = create_app("development")
rules = {rule.rule for rule in app.url_map.iter_rules()}
expected = {
"/",
@@ -47,6 +54,8 @@ class AppFactoryTests(unittest.TestCase):
"/wip-overview",
"/wip-detail",
"/excel-query",
"/query-tool",
"/tmtt-defect",
"/api/wip/overview/summary",
"/api/wip/overview/matrix",
"/api/wip/overview/hold",
@@ -56,6 +65,8 @@ class AppFactoryTests(unittest.TestCase):
"/api/resource/status/summary",
"/api/dashboard/kpi",
"/api/excel-query/upload",
"/api/query-tool/resolve",
"/api/tmtt-defect/analysis",
}
missing = expected - rules
self.assertFalse(missing, f"Missing routes: {sorted(missing)}")

View File

@@ -105,14 +105,15 @@ class TestWipApiWithCache:
@pytest.fixture
def mock_wip_cache_data(self):
"""Create mock WIP data for cache."""
return pd.DataFrame({
'LOTID': ['LOT001', 'LOT002', 'LOT003'],
'QTY': [100, 200, 150],
'WORKORDER': ['WO001', 'WO002', 'WO003'],
'WORKCENTER_GROUP': ['WC1', 'WC1', 'WC2'],
'WORKCENTERSEQUENCE_GROUP': [1, 1, 2],
'PRODUCTLINENAME': ['PKG1', 'PKG2', 'PKG1'],
'EQUIPMENTCOUNT': [1, 0, 0],
return pd.DataFrame({
'LOTID': ['LOT001', 'LOT002', 'LOT003'],
'QTY': [100, 200, 150],
'WORKORDER': ['WO001', 'WO002', 'WO003'],
'WORKCENTER_GROUP': ['WC1', 'WC1', 'WC2'],
'WORKCENTERSEQUENCE_GROUP': [1, 1, 2],
'PACKAGE_LEF': ['PKG1', 'PKG2', 'PKG1'],
'PRODUCTLINENAME': ['PKG1', 'PKG2', 'PKG1'],
'EQUIPMENTCOUNT': [1, 0, 0],
'CURRENTHOLDCOUNT': [0, 1, 0],
'HOLDREASONNAME': [None, 'Quality Issue', None],
'STATUS': ['ACTIVE', 'HOLD', 'ACTIVE'],

View File

@@ -384,5 +384,8 @@ class TestPerformancePage:
# Should be 200 for authenticated admin
assert response.status_code == 200
# Check for performance-related content
data_str = response.data.decode('utf-8', errors='ignore').lower()
html = response.data.decode('utf-8', errors='ignore')
data_str = html.lower()
assert 'performance' in data_str or '效能' in data_str
assert '/static/js/chart.umd.min.js' in html
assert 'cdn.jsdelivr.net' not in html

View File

@@ -0,0 +1,645 @@
# -*- coding: utf-8 -*-
"""Integration tests for Query Tool API routes.
Tests the API endpoints with mocked service dependencies:
- Input validation (empty, over limit, invalid format)
- Success responses
- Error handling
"""
import pytest
import json
from unittest.mock import patch, MagicMock
from mes_dashboard import create_app
@pytest.fixture
def app():
"""Create test Flask application."""
app = create_app()
app.config['TESTING'] = True
return app
@pytest.fixture
def client(app):
"""Create test client."""
return app.test_client()
class TestQueryToolPage:
"""Tests for /query-tool page route."""
def test_page_returns_html(self, client):
"""Should return the query tool page."""
response = client.get('/query-tool')
assert response.status_code == 200
assert b'html' in response.data.lower()
class TestResolveEndpoint:
"""Tests for /api/query-tool/resolve endpoint."""
def test_missing_input_type(self, client):
"""Should return error without input_type."""
response = client.post(
'/api/query-tool/resolve',
json={
'values': ['GA23100020-A00-001']
}
)
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
def test_missing_values(self, client):
"""Should return error without values."""
response = client.post(
'/api/query-tool/resolve',
json={
'input_type': 'lot_id'
}
)
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
def test_empty_values(self, client):
"""Should return error for empty values list."""
response = client.post(
'/api/query-tool/resolve',
json={
'input_type': 'lot_id',
'values': []
}
)
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
def test_values_over_limit(self, client):
"""Should reject values exceeding limit."""
# More than MAX_LOT_IDS (50)
values = [f'GA{i:09d}' for i in range(51)]
response = client.post(
'/api/query-tool/resolve',
json={
'input_type': 'lot_id',
'values': values
}
)
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
assert '超過上限' in data['error'] or '50' in data['error']
@patch('mes_dashboard.routes.query_tool_routes.resolve_lots')
def test_resolve_success(self, mock_resolve, client):
"""Should return resolved LOT IDs on success."""
mock_resolve.return_value = {
'data': [
{
'container_id': '488103800029578b',
'lot_id': 'GA23100020-A00-001',
'input_value': 'GA23100020-A00-001',
'spec_name': 'SPEC-001'
}
],
'total': 1,
'input_count': 1,
'not_found': []
}
response = client.post(
'/api/query-tool/resolve',
json={
'input_type': 'lot_id',
'values': ['GA23100020-A00-001']
}
)
assert response.status_code == 200
data = json.loads(response.data)
assert 'data' in data
assert data['total'] == 1
assert data['data'][0]['lot_id'] == 'GA23100020-A00-001'
@patch('mes_dashboard.routes.query_tool_routes.resolve_lots')
def test_resolve_not_found(self, mock_resolve, client):
"""Should return not_found list for missing LOT IDs."""
mock_resolve.return_value = {
'data': [],
'total': 0,
'input_count': 1,
'not_found': ['INVALID-LOT-ID']
}
response = client.post(
'/api/query-tool/resolve',
json={
'input_type': 'lot_id',
'values': ['INVALID-LOT-ID']
}
)
assert response.status_code == 200
data = json.loads(response.data)
assert data['total'] == 0
assert 'INVALID-LOT-ID' in data['not_found']
class TestLotHistoryEndpoint:
"""Tests for /api/query-tool/lot-history endpoint."""
def test_missing_container_id(self, client):
"""Should return error without container_id."""
response = client.get('/api/query-tool/lot-history')
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
@patch('mes_dashboard.routes.query_tool_routes.get_lot_history')
def test_lot_history_success(self, mock_query, client):
"""Should return lot history on success."""
mock_query.return_value = {
'data': [
{
'CONTAINERID': '488103800029578b',
'EQUIPMENTNAME': 'ASSY-01',
'SPECNAME': 'SPEC-001',
'TRACKINTIMESTAMP': '2024-01-15 10:30:00',
'TRACKOUTTIMESTAMP': '2024-01-15 11:00:00'
}
],
'total': 1
}
response = client.get('/api/query-tool/lot-history?container_id=488103800029578b')
assert response.status_code == 200
data = json.loads(response.data)
assert 'data' in data
assert data['total'] == 1
@patch('mes_dashboard.routes.query_tool_routes.get_lot_history')
def test_lot_history_service_error(self, mock_query, client):
"""Should return error from service."""
mock_query.return_value = {'error': '查詢失敗'}
response = client.get('/api/query-tool/lot-history?container_id=invalid')
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
class TestAdjacentLotsEndpoint:
"""Tests for /api/query-tool/adjacent-lots endpoint."""
def test_missing_equipment_id(self, client):
"""Should return error without equipment_id."""
response = client.get(
'/api/query-tool/adjacent-lots?'
'target_time=2024-01-15T10:30:00'
)
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
def test_missing_target_time(self, client):
"""Should return error without target_time."""
response = client.get(
'/api/query-tool/adjacent-lots?'
'equipment_id=EQ001'
)
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
def test_with_only_equipment_id(self, client):
"""Should return error with only equipment_id (no target_time)."""
response = client.get(
'/api/query-tool/adjacent-lots?'
'equipment_id=EQ001'
)
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
@patch('mes_dashboard.routes.query_tool_routes.get_adjacent_lots')
def test_adjacent_lots_success(self, mock_query, client):
"""Should return adjacent lots on success."""
mock_query.return_value = {
'data': [
{
'CONTAINERID': '488103800029578a',
'CONTAINERNAME': 'GA23100020-A00-000',
'relative_position': -1
},
{
'CONTAINERID': '488103800029578b',
'CONTAINERNAME': 'GA23100020-A00-001',
'relative_position': 0
},
{
'CONTAINERID': '488103800029578c',
'CONTAINERNAME': 'GA23100020-A00-002',
'relative_position': 1
}
],
'total': 3
}
response = client.get(
'/api/query-tool/adjacent-lots?'
'equipment_id=EQ001&target_time=2024-01-15T10:30:00'
)
assert response.status_code == 200
data = json.loads(response.data)
assert 'data' in data
assert data['total'] == 3
# Verify service was called without spec_name
mock_query.assert_called_once()
call_args = mock_query.call_args
assert call_args[0][0] == 'EQ001' # equipment_id
assert '2024-01-15' in call_args[0][1] # target_time
class TestLotAssociationsEndpoint:
"""Tests for /api/query-tool/lot-associations endpoint."""
def test_missing_container_id(self, client):
"""Should return error without container_id."""
response = client.get('/api/query-tool/lot-associations?type=materials')
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
def test_missing_type(self, client):
"""Should return error without type."""
response = client.get('/api/query-tool/lot-associations?container_id=488103800029578b')
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
def test_invalid_type(self, client):
"""Should return error for invalid association type."""
response = client.get(
'/api/query-tool/lot-associations?container_id=488103800029578b&type=invalid'
)
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
assert '不支援' in data['error'] or 'type' in data['error'].lower()
@patch('mes_dashboard.routes.query_tool_routes.get_lot_materials')
def test_lot_materials_success(self, mock_query, client):
"""Should return lot materials on success."""
mock_query.return_value = {
'data': [
{
'MATERIALTYPE': 'TypeA',
'MATERIALNAME': 'Material-001',
'QTY': 100
}
],
'total': 1
}
response = client.get(
'/api/query-tool/lot-associations?container_id=488103800029578b&type=materials'
)
assert response.status_code == 200
data = json.loads(response.data)
assert 'data' in data
assert data['total'] == 1
class TestEquipmentPeriodEndpoint:
"""Tests for /api/query-tool/equipment-period endpoint."""
def test_missing_query_type(self, client):
"""Should return error without query_type."""
response = client.post(
'/api/query-tool/equipment-period',
json={
'equipment_ids': ['EQ001'],
'start_date': '2024-01-01',
'end_date': '2024-01-31'
}
)
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
assert '查詢類型' in data['error'] or 'type' in data['error'].lower()
def test_empty_equipment_ids(self, client):
"""Should return error for empty equipment_ids."""
response = client.post(
'/api/query-tool/equipment-period',
json={
'equipment_ids': [],
'start_date': '2024-01-01',
'end_date': '2024-01-31',
'query_type': 'status_hours'
}
)
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
def test_missing_start_date(self, client):
"""Should return error without start_date."""
response = client.post(
'/api/query-tool/equipment-period',
json={
'equipment_ids': ['EQ001'],
'end_date': '2024-01-31',
'query_type': 'status_hours'
}
)
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
def test_missing_end_date(self, client):
"""Should return error without end_date."""
response = client.post(
'/api/query-tool/equipment-period',
json={
'equipment_ids': ['EQ001'],
'start_date': '2024-01-01',
'query_type': 'status_hours'
}
)
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
def test_invalid_date_range(self, client):
"""Should return error for end date before start date."""
response = client.post(
'/api/query-tool/equipment-period',
json={
'equipment_ids': ['EQ001'],
'start_date': '2024-12-31',
'end_date': '2024-01-01',
'query_type': 'status_hours'
}
)
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
assert '結束日期' in data['error'] or '早於' in data['error']
def test_date_range_exceeds_limit(self, client):
"""Should reject date range > 90 days."""
response = client.post(
'/api/query-tool/equipment-period',
json={
'equipment_ids': ['EQ001'],
'start_date': '2024-01-01',
'end_date': '2024-06-01',
'query_type': 'status_hours'
}
)
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
assert '90' in data['error']
def test_invalid_query_type(self, client):
"""Should reject invalid query_type."""
response = client.post(
'/api/query-tool/equipment-period',
json={
'equipment_ids': ['EQ001'],
'start_date': '2024-01-01',
'end_date': '2024-01-31',
'query_type': 'invalid_type'
}
)
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
assert '查詢類型' in data['error'] or 'type' in data['error'].lower()
@patch('mes_dashboard.routes.query_tool_routes.get_equipment_status_hours')
def test_equipment_status_hours_success(self, mock_status, client):
"""Should return equipment status hours on success."""
mock_status.return_value = {'data': [], 'total': 0}
response = client.post(
'/api/query-tool/equipment-period',
json={
'equipment_ids': ['EQ001'],
'start_date': '2024-01-01',
'end_date': '2024-01-31',
'query_type': 'status_hours'
}
)
assert response.status_code == 200
data = json.loads(response.data)
assert 'data' in data
class TestExportCsvEndpoint:
"""Tests for /api/query-tool/export-csv endpoint."""
def test_missing_export_type(self, client):
"""Should return error without export_type."""
response = client.post(
'/api/query-tool/export-csv',
json={
'params': {'container_id': '488103800029578b'}
}
)
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
def test_invalid_export_type(self, client):
"""Should return error for invalid export_type."""
response = client.post(
'/api/query-tool/export-csv',
json={
'export_type': 'invalid_type',
'params': {}
}
)
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
assert '不支援' in data['error'] or 'type' in data['error'].lower()
@patch('mes_dashboard.routes.query_tool_routes.get_lot_history')
def test_export_lot_history_success(self, mock_get_history, client):
"""Should return CSV for lot history."""
mock_get_history.return_value = {
'data': [
{
'EQUIPMENTNAME': 'ASSY-01',
'SPECNAME': 'SPEC-001',
'TRACKINTIMESTAMP': '2024-01-15 10:00:00'
}
],
'total': 1
}
response = client.post(
'/api/query-tool/export-csv',
json={
'export_type': 'lot_history',
'params': {'container_id': '488103800029578b'}
}
)
assert response.status_code == 200
assert 'text/csv' in response.content_type
class TestEquipmentListEndpoint:
"""Tests for /api/query-tool/equipment-list endpoint."""
@patch('mes_dashboard.services.resource_cache.get_all_resources')
def test_get_equipment_list_success(self, mock_get_resources, client):
"""Should return equipment list."""
mock_get_resources.return_value = [
{
'RESOURCEID': 'EQ001',
'RESOURCENAME': 'ASSY-01',
'WORKCENTERNAME': 'WC-A',
'RESOURCEFAMILYNAME': 'FAM-01'
},
{
'RESOURCEID': 'EQ002',
'RESOURCENAME': 'ASSY-02',
'WORKCENTERNAME': 'WC-B',
'RESOURCEFAMILYNAME': 'FAM-02'
}
]
response = client.get('/api/query-tool/equipment-list')
assert response.status_code == 200
data = json.loads(response.data)
assert 'data' in data
assert 'total' in data
assert data['total'] == 2
@patch('mes_dashboard.services.resource_cache.get_all_resources')
def test_get_equipment_list_empty(self, mock_get_resources, client):
"""Should return error when no equipment available."""
mock_get_resources.return_value = []
response = client.get('/api/query-tool/equipment-list')
assert response.status_code == 500
data = json.loads(response.data)
assert 'error' in data
@patch('mes_dashboard.services.resource_cache.get_all_resources')
def test_get_equipment_list_exception(self, mock_get_resources, client):
"""Should handle exception gracefully."""
mock_get_resources.side_effect = Exception('Database error')
response = client.get('/api/query-tool/equipment-list')
assert response.status_code == 500
data = json.loads(response.data)
assert 'error' in data
class TestWorkcenterGroupsEndpoint:
"""Tests for /api/query-tool/workcenter-groups endpoint."""
@patch('mes_dashboard.services.filter_cache.get_workcenter_groups')
def test_returns_groups_list(self, mock_get_groups, client):
"""Should return workcenter groups list."""
mock_get_groups.return_value = [
{'name': 'DB', 'sequence': 1},
{'name': 'WB', 'sequence': 2},
]
response = client.get('/api/query-tool/workcenter-groups')
assert response.status_code == 200
data = json.loads(response.data)
assert 'data' in data
assert len(data['data']) == 2
assert data['total'] == 2
@patch('mes_dashboard.services.filter_cache.get_workcenter_groups')
def test_handles_cache_failure(self, mock_get_groups, client):
"""Should return 500 when cache fails."""
mock_get_groups.return_value = None
response = client.get('/api/query-tool/workcenter-groups')
assert response.status_code == 500
data = json.loads(response.data)
assert 'error' in data
@patch('mes_dashboard.services.filter_cache.get_workcenter_groups')
def test_handles_exception(self, mock_get_groups, client):
"""Should handle exception gracefully."""
mock_get_groups.side_effect = Exception('Cache error')
response = client.get('/api/query-tool/workcenter-groups')
assert response.status_code == 500
data = json.loads(response.data)
assert 'error' in data
class TestLotHistoryWithWorkcenterFilter:
"""Tests for /api/query-tool/lot-history with workcenter filter."""
@patch('mes_dashboard.routes.query_tool_routes.get_lot_history')
def test_accepts_workcenter_groups_param(self, mock_query, client):
"""Should pass workcenter_groups parameter to service."""
mock_query.return_value = {
'data': [],
'total': 0,
'filtered_by_groups': ['DB', 'WB']
}
response = client.get(
'/api/query-tool/lot-history?'
'container_id=abc123&workcenter_groups=DB,WB'
)
assert response.status_code == 200
# Verify the service was called with workcenter_groups
call_args = mock_query.call_args
assert call_args[1].get('workcenter_groups') == ['DB', 'WB']
@patch('mes_dashboard.routes.query_tool_routes.get_lot_history')
def test_empty_workcenter_groups_ignored(self, mock_query, client):
"""Should ignore empty workcenter_groups parameter."""
mock_query.return_value = {
'data': [],
'total': 0,
'filtered_by_groups': []
}
response = client.get(
'/api/query-tool/lot-history?'
'container_id=abc123&workcenter_groups='
)
assert response.status_code == 200
# Verify workcenter_groups is None (not empty list)
call_args = mock_query.call_args
assert call_args[1].get('workcenter_groups') is None
@patch('mes_dashboard.routes.query_tool_routes.get_lot_history')
def test_returns_filtered_by_groups_in_response(self, mock_query, client):
"""Should include filtered_by_groups in response."""
mock_query.return_value = {
'data': [{'CONTAINERID': 'abc123'}],
'total': 1,
'filtered_by_groups': ['DB']
}
response = client.get(
'/api/query-tool/lot-history?'
'container_id=abc123&workcenter_groups=DB'
)
assert response.status_code == 200
data = json.loads(response.data)
assert data.get('filtered_by_groups') == ['DB']

View File

@@ -0,0 +1,420 @@
# -*- coding: utf-8 -*-
"""Unit tests for Query Tool service functions.
Tests the core service functions without database dependencies:
- Input validation (LOT, equipment, date range)
- IN clause building helpers
- Constants validation
"""
import pytest
from mes_dashboard.services.query_tool_service import (
validate_date_range,
validate_lot_input,
validate_equipment_input,
_build_in_clause,
_build_in_filter,
BATCH_SIZE,
MAX_LOT_IDS,
MAX_SERIAL_NUMBERS,
MAX_WORK_ORDERS,
MAX_EQUIPMENTS,
MAX_DATE_RANGE_DAYS,
)
class TestValidateDateRange:
"""Tests for validate_date_range function."""
def test_valid_range(self):
"""Should return None for valid date range."""
result = validate_date_range('2024-01-01', '2024-01-31')
assert result is None
def test_same_day(self):
"""Should allow same day as start and end."""
result = validate_date_range('2024-01-01', '2024-01-01')
assert result is None
def test_end_before_start(self):
"""Should reject end date before start date."""
result = validate_date_range('2024-12-31', '2024-01-01')
assert result is not None
assert '結束日期' in result or '早於' in result
def test_exceeds_max_range(self):
"""Should reject date range exceeding limit."""
result = validate_date_range('2023-01-01', '2024-12-31')
assert result is not None
assert str(MAX_DATE_RANGE_DAYS) in result
def test_exactly_max_range(self):
"""Should allow exactly max range days."""
# 90 days from 2024-01-01 is 2024-03-31
result = validate_date_range('2024-01-01', '2024-03-31')
assert result is None
def test_one_day_over_max_range(self):
"""Should reject one day over max range."""
# 91 days
result = validate_date_range('2024-01-01', '2024-04-02')
assert result is not None
assert str(MAX_DATE_RANGE_DAYS) in result
def test_invalid_date_format(self):
"""Should reject invalid date format."""
result = validate_date_range('01-01-2024', '12-31-2024')
assert result is not None
assert '格式' in result or 'format' in result.lower()
def test_invalid_start_date(self):
"""Should reject invalid start date."""
result = validate_date_range('2024-13-01', '2024-12-31')
assert result is not None
assert '格式' in result or 'format' in result.lower()
def test_invalid_end_date(self):
"""Should reject invalid end date."""
result = validate_date_range('2024-01-01', '2024-02-30')
assert result is not None
assert '格式' in result or 'format' in result.lower()
def test_non_date_string(self):
"""Should reject non-date strings."""
result = validate_date_range('abc', 'def')
assert result is not None
assert '格式' in result or 'format' in result.lower()
class TestValidateLotInput:
"""Tests for validate_lot_input function."""
def test_valid_lot_ids(self):
"""Should accept valid LOT IDs within limit."""
values = ['GA23100020-A00-001', 'GA23100020-A00-002']
result = validate_lot_input('lot_id', values)
assert result is None
def test_valid_serial_numbers(self):
"""Should accept valid serial numbers within limit."""
values = ['SN001', 'SN002', 'SN003']
result = validate_lot_input('serial_number', values)
assert result is None
def test_valid_work_orders(self):
"""Should accept valid work orders within limit."""
values = ['GA231000001']
result = validate_lot_input('work_order', values)
assert result is None
def test_empty_values(self):
"""Should reject empty values list."""
result = validate_lot_input('lot_id', [])
assert result is not None
assert '至少一個' in result
def test_exceeds_lot_id_limit(self):
"""Should reject LOT IDs exceeding limit."""
values = [f'GA{i:09d}' for i in range(MAX_LOT_IDS + 1)]
result = validate_lot_input('lot_id', values)
assert result is not None
assert '超過上限' in result
assert str(MAX_LOT_IDS) in result
def test_exceeds_serial_number_limit(self):
"""Should reject serial numbers exceeding limit."""
values = [f'SN{i:06d}' for i in range(MAX_SERIAL_NUMBERS + 1)]
result = validate_lot_input('serial_number', values)
assert result is not None
assert '超過上限' in result
assert str(MAX_SERIAL_NUMBERS) in result
def test_exceeds_work_order_limit(self):
"""Should reject work orders exceeding limit."""
values = [f'WO{i:06d}' for i in range(MAX_WORK_ORDERS + 1)]
result = validate_lot_input('work_order', values)
assert result is not None
assert '超過上限' in result
assert str(MAX_WORK_ORDERS) in result
def test_exactly_at_limit(self):
"""Should accept values exactly at limit."""
values = [f'GA{i:09d}' for i in range(MAX_LOT_IDS)]
result = validate_lot_input('lot_id', values)
assert result is None
def test_unknown_input_type_uses_default_limit(self):
"""Should use default limit for unknown input types."""
values = [f'X{i}' for i in range(MAX_LOT_IDS)]
result = validate_lot_input('unknown_type', values)
assert result is None
values_over = [f'X{i}' for i in range(MAX_LOT_IDS + 1)]
result = validate_lot_input('unknown_type', values_over)
assert result is not None
class TestValidateEquipmentInput:
"""Tests for validate_equipment_input function."""
def test_valid_equipment_ids(self):
"""Should accept valid equipment IDs within limit."""
values = ['EQ001', 'EQ002', 'EQ003']
result = validate_equipment_input(values)
assert result is None
def test_empty_equipment_ids(self):
"""Should reject empty equipment list."""
result = validate_equipment_input([])
assert result is not None
assert '至少一台' in result
def test_exceeds_equipment_limit(self):
"""Should reject equipment IDs exceeding limit."""
values = [f'EQ{i:05d}' for i in range(MAX_EQUIPMENTS + 1)]
result = validate_equipment_input(values)
assert result is not None
assert '不得超過' in result
assert str(MAX_EQUIPMENTS) in result
def test_exactly_at_limit(self):
"""Should accept equipment IDs exactly at limit."""
values = [f'EQ{i:05d}' for i in range(MAX_EQUIPMENTS)]
result = validate_equipment_input(values)
assert result is None
class TestBuildInClause:
"""Tests for _build_in_clause function."""
def test_empty_list(self):
"""Should return empty list for empty input."""
result = _build_in_clause([])
assert result == []
def test_single_value(self):
"""Should return single chunk for single value."""
result = _build_in_clause(['VAL001'])
assert len(result) == 1
assert result[0] == "'VAL001'"
def test_multiple_values(self):
"""Should join multiple values with comma."""
result = _build_in_clause(['VAL001', 'VAL002', 'VAL003'])
assert len(result) == 1
assert "'VAL001'" in result[0]
assert "'VAL002'" in result[0]
assert "'VAL003'" in result[0]
assert result[0] == "'VAL001', 'VAL002', 'VAL003'"
def test_chunking(self):
"""Should chunk when exceeding batch size."""
# Create more than BATCH_SIZE values
values = [f'VAL{i:06d}' for i in range(BATCH_SIZE + 10)]
result = _build_in_clause(values)
assert len(result) == 2
# First chunk should have BATCH_SIZE items
assert result[0].count("'") == BATCH_SIZE * 2 # 2 quotes per value
def test_escape_single_quotes(self):
"""Should escape single quotes in values."""
result = _build_in_clause(["VAL'001"])
assert len(result) == 1
assert "VAL''001" in result[0] # Escaped
def test_custom_chunk_size(self):
"""Should respect custom chunk size."""
values = ['V1', 'V2', 'V3', 'V4', 'V5']
result = _build_in_clause(values, max_chunk_size=2)
assert len(result) == 3 # 2+2+1
class TestBuildInFilter:
"""Tests for _build_in_filter function."""
def test_empty_list(self):
"""Should return 1=0 for empty input (no results)."""
result = _build_in_filter([], 'COL')
assert result == "1=0"
def test_single_value(self):
"""Should build simple IN clause for single value."""
result = _build_in_filter(['VAL001'], 'COL')
assert "COL IN" in result
assert "'VAL001'" in result
def test_multiple_values(self):
"""Should build IN clause with multiple values."""
result = _build_in_filter(['VAL001', 'VAL002'], 'COL')
assert "COL IN" in result
assert "'VAL001'" in result
assert "'VAL002'" in result
def test_custom_column(self):
"""Should use custom column name."""
result = _build_in_filter(['VAL001'], 't.MYCOL')
assert "t.MYCOL IN" in result
def test_large_list_uses_or(self):
"""Should use OR for chunked results."""
# Create more than BATCH_SIZE values
values = [f'VAL{i:06d}' for i in range(BATCH_SIZE + 10)]
result = _build_in_filter(values, 'COL')
assert " OR " in result
# Should have parentheses wrapping the OR conditions
assert result.startswith("(")
assert result.endswith(")")
class TestServiceConstants:
"""Tests for service constants."""
def test_batch_size_is_reasonable(self):
"""Batch size should be <= 1000 (Oracle limit)."""
assert BATCH_SIZE <= 1000
def test_max_date_range_is_reasonable(self):
"""Max date range should be 90 days."""
assert MAX_DATE_RANGE_DAYS == 90
def test_max_lot_ids_is_reasonable(self):
"""Max LOT IDs should be sensible."""
assert 10 <= MAX_LOT_IDS <= 100
def test_max_serial_numbers_is_reasonable(self):
"""Max serial numbers should be sensible."""
assert 10 <= MAX_SERIAL_NUMBERS <= 100
def test_max_work_orders_is_reasonable(self):
"""Max work orders should be low due to expansion."""
assert MAX_WORK_ORDERS <= 20 # Work orders can expand to many LOTs
def test_max_equipments_is_reasonable(self):
"""Max equipments should be sensible."""
assert 5 <= MAX_EQUIPMENTS <= 50
class TestGetWorkcenterForGroups:
"""Tests for _get_workcenters_for_groups helper function."""
def test_calls_filter_cache(self):
"""Should call filter_cache.get_workcenters_for_groups."""
from unittest.mock import patch
with patch('mes_dashboard.services.filter_cache.get_workcenters_for_groups') as mock_get:
from mes_dashboard.services.query_tool_service import _get_workcenters_for_groups
mock_get.return_value = ['DB_1', 'DB_2']
result = _get_workcenters_for_groups(['DB'])
mock_get.assert_called_once_with(['DB'])
assert result == ['DB_1', 'DB_2']
def test_returns_empty_list_for_unknown_group(self):
"""Should return empty list for unknown group."""
from unittest.mock import patch
with patch('mes_dashboard.services.filter_cache.get_workcenters_for_groups') as mock_get:
from mes_dashboard.services.query_tool_service import _get_workcenters_for_groups
mock_get.return_value = []
result = _get_workcenters_for_groups(['UNKNOWN'])
assert result == []
class TestGetLotHistoryWithWorkcenterFilter:
"""Tests for get_lot_history with workcenter_groups filter."""
def test_no_filter_returns_all(self):
"""When no workcenter_groups, should not add filter to SQL."""
from unittest.mock import patch, MagicMock
import pandas as pd
with patch('mes_dashboard.services.query_tool_service.read_sql_df') as mock_read:
with patch('mes_dashboard.services.query_tool_service.SQLLoader') as mock_loader:
from mes_dashboard.services.query_tool_service import get_lot_history
mock_loader.load.return_value = 'SELECT * FROM t WHERE c = :container_id {{ WORKCENTER_FILTER }}'
mock_read.return_value = pd.DataFrame({
'CONTAINERID': ['abc123'],
'WORKCENTERNAME': ['DB_1'],
})
result = get_lot_history('abc123', workcenter_groups=None)
assert 'error' not in result
assert result['filtered_by_groups'] == []
# Verify SQL does not contain WORKCENTERNAME IN
sql_called = mock_read.call_args[0][0]
assert 'WORKCENTERNAME IN' not in sql_called
assert '{{ WORKCENTER_FILTER }}' not in sql_called
def test_with_filter_adds_condition(self):
"""When workcenter_groups provided, should filter by workcenters."""
from unittest.mock import patch
import pandas as pd
with patch('mes_dashboard.services.query_tool_service.read_sql_df') as mock_read:
with patch('mes_dashboard.services.query_tool_service.SQLLoader') as mock_loader:
with patch('mes_dashboard.services.filter_cache.get_workcenters_for_groups') as mock_get_wc:
from mes_dashboard.services.query_tool_service import get_lot_history
mock_loader.load.return_value = 'SELECT * FROM t WHERE c = :container_id {{ WORKCENTER_FILTER }}'
mock_get_wc.return_value = ['DB_1', 'DB_2']
mock_read.return_value = pd.DataFrame({
'CONTAINERID': ['abc123'],
'WORKCENTERNAME': ['DB_1'],
})
result = get_lot_history('abc123', workcenter_groups=['DB'])
mock_get_wc.assert_called_once_with(['DB'])
assert result['filtered_by_groups'] == ['DB']
# Verify SQL contains filter
sql_called = mock_read.call_args[0][0]
assert 'WORKCENTERNAME' in sql_called
def test_empty_groups_list_no_filter(self):
"""Empty groups list should return all (no filter)."""
from unittest.mock import patch
import pandas as pd
with patch('mes_dashboard.services.query_tool_service.read_sql_df') as mock_read:
with patch('mes_dashboard.services.query_tool_service.SQLLoader') as mock_loader:
from mes_dashboard.services.query_tool_service import get_lot_history
mock_loader.load.return_value = 'SELECT * FROM t WHERE c = :container_id {{ WORKCENTER_FILTER }}'
mock_read.return_value = pd.DataFrame({
'CONTAINERID': ['abc123'],
'WORKCENTERNAME': ['DB_1'],
})
result = get_lot_history('abc123', workcenter_groups=[])
assert result['filtered_by_groups'] == []
# Verify SQL does not contain WORKCENTERNAME IN
sql_called = mock_read.call_args[0][0]
assert 'WORKCENTERNAME IN' not in sql_called
def test_filter_with_empty_workcenters_result(self):
"""When group has no workcenters, should not add filter."""
from unittest.mock import patch
import pandas as pd
with patch('mes_dashboard.services.query_tool_service.read_sql_df') as mock_read:
with patch('mes_dashboard.services.query_tool_service.SQLLoader') as mock_loader:
with patch('mes_dashboard.services.filter_cache.get_workcenters_for_groups') as mock_get_wc:
from mes_dashboard.services.query_tool_service import get_lot_history
mock_loader.load.return_value = 'SELECT * FROM t WHERE c = :container_id {{ WORKCENTER_FILTER }}'
mock_get_wc.return_value = [] # No workcenters for this group
mock_read.return_value = pd.DataFrame({
'CONTAINERID': ['abc123'],
'WORKCENTERNAME': ['DB_1'],
})
result = get_lot_history('abc123', workcenter_groups=['UNKNOWN'])
# Should still succeed, just no filter applied
assert 'error' not in result

View File

@@ -15,7 +15,11 @@ from mes_dashboard.routes.health_routes import check_database
@pytest.fixture
def testing_app_factory(monkeypatch):
def _factory(*, csrf_enabled: bool = False):
from mes_dashboard.routes import auth_routes
monkeypatch.setenv("REALTIME_EQUIPMENT_CACHE_ENABLED", "false")
with auth_routes._rate_limit_lock:
auth_routes._login_attempts.clear()
db._ENGINE = None
db._HEALTH_ENGINE = None
app = create_app("testing")
@@ -154,7 +158,8 @@ def test_security_headers_applied_globally(testing_app_factory):
assert response.status_code == 200
assert "Content-Security-Policy" in response.headers
assert response.headers["X-Frame-Options"] == "DENY"
assert "frame-ancestors 'self'" in response.headers["Content-Security-Policy"]
assert response.headers["X-Frame-Options"] == "SAMEORIGIN"
assert response.headers["X-Content-Type-Options"] == "nosniff"
assert "Referrer-Policy" in response.headers

View File

@@ -81,6 +81,24 @@ class TestTemplateIntegration(unittest.TestCase):
self.assertIn('mes-api.js', html)
self.assertIn('mes-toast-container', html)
def test_query_tool_page_includes_base_scripts(self):
response = self.client.get('/query-tool')
self.assertEqual(response.status_code, 200)
html = response.data.decode('utf-8')
self.assertIn('toast.js', html)
self.assertIn('mes-api.js', html)
self.assertIn('mes-toast-container', html)
def test_tmtt_defect_page_includes_base_scripts(self):
response = self.client.get('/tmtt-defect')
self.assertEqual(response.status_code, 200)
html = response.data.decode('utf-8')
self.assertIn('toast.js', html)
self.assertIn('mes-api.js', html)
self.assertIn('mes-toast-container', html)
class TestToastCSSIntegration(unittest.TestCase):
"""Test that Toast CSS styles are included in pages."""
@@ -148,11 +166,29 @@ class TestMesApiUsageInTemplates(unittest.TestCase):
response = self.client.get('/resource')
html = response.data.decode('utf-8')
self.assertTrue('MesApi.post' in html or '/static/dist/resource-status.js' in html)
self.assertTrue(
'MesApi.post' in html or
'MesApi.get' in html or
'/static/dist/resource-status.js' in html
)
def test_query_tool_page_uses_vite_module(self):
response = self.client.get('/query-tool')
html = response.data.decode('utf-8')
self.assertIn('/static/dist/query-tool.js', html)
self.assertIn('type="module"', html)
def test_tmtt_defect_page_uses_vite_module(self):
response = self.client.get('/tmtt-defect')
html = response.data.decode('utf-8')
self.assertIn('/static/dist/tmtt-defect.js', html)
self.assertIn('type="module"', html)
class TestViteModuleFallbackIntegration(unittest.TestCase):
"""Ensure page templates support Vite module assets with inline fallback."""
class TestViteModuleIntegration(unittest.TestCase):
"""Ensure page templates render Vite module assets."""
def setUp(self):
db._ENGINE = None
@@ -161,25 +197,7 @@ class TestViteModuleFallbackIntegration(unittest.TestCase):
self.client = self.app.test_client()
_login_as_admin(self.client)
def test_pages_render_inline_fallback_when_asset_missing(self):
endpoints_and_markers = [
('/wip-overview', 'function applyFilters'),
('/wip-detail', 'function init'),
('/hold-detail?reason=test-reason', 'function loadAllData'),
('/tables', 'function loadTableData'),
('/resource', 'function loadData'),
('/resource-history', 'function executeQuery'),
('/job-query', 'function queryJobs'),
('/excel-query', 'function uploadExcel'),
]
for endpoint, marker in endpoints_and_markers:
with patch('mes_dashboard.app.os.path.exists', return_value=False):
response = self.client.get(endpoint)
self.assertEqual(response.status_code, 200)
html = response.data.decode('utf-8')
self.assertIn(marker, html)
def test_pages_render_vite_module_when_asset_exists(self):
def test_pages_render_vite_module_reference(self):
endpoints_and_assets = [
('/wip-overview', 'wip-overview.js'),
('/wip-detail', 'wip-detail.js'),
@@ -189,9 +207,11 @@ class TestViteModuleFallbackIntegration(unittest.TestCase):
('/resource-history', 'resource-history.js'),
('/job-query', 'job-query.js'),
('/excel-query', 'excel-query.js'),
('/query-tool', 'query-tool.js'),
('/tmtt-defect', 'tmtt-defect.js'),
]
for endpoint, asset in endpoints_and_assets:
with patch('mes_dashboard.app.os.path.exists', return_value=True):
with patch('mes_dashboard.app.os.path.exists', return_value=False):
response = self.client.get(endpoint)
self.assertEqual(response.status_code, 200)
html = response.data.decode('utf-8')

View File

@@ -0,0 +1,146 @@
# -*- coding: utf-8 -*-
"""Integration tests for TMTT Defect Analysis API routes."""
import unittest
from unittest.mock import patch
import pandas as pd
class TestTmttDefectAnalysisEndpoint(unittest.TestCase):
"""Test GET /api/tmtt-defect/analysis endpoint."""
def setUp(self):
from mes_dashboard.core import database as db
db._ENGINE = None
from mes_dashboard.app import create_app
self.app = create_app()
self.client = self.app.test_client()
def test_missing_start_date(self):
resp = self.client.get('/api/tmtt-defect/analysis?end_date=2025-01-31')
self.assertEqual(resp.status_code, 400)
data = resp.get_json()
self.assertFalse(data['success'])
def test_missing_end_date(self):
resp = self.client.get('/api/tmtt-defect/analysis?start_date=2025-01-01')
self.assertEqual(resp.status_code, 400)
data = resp.get_json()
self.assertFalse(data['success'])
def test_missing_both_dates(self):
resp = self.client.get('/api/tmtt-defect/analysis')
self.assertEqual(resp.status_code, 400)
@patch('mes_dashboard.routes.tmtt_defect_routes.query_tmtt_defect_analysis')
def test_invalid_date_format(self, mock_query):
mock_query.return_value = {'error': '日期格式無效,請使用 YYYY-MM-DD'}
resp = self.client.get(
'/api/tmtt-defect/analysis?start_date=invalid&end_date=2025-01-31'
)
self.assertEqual(resp.status_code, 400)
data = resp.get_json()
self.assertFalse(data['success'])
self.assertIn('格式', data['error'])
@patch('mes_dashboard.routes.tmtt_defect_routes.query_tmtt_defect_analysis')
def test_exceeds_180_days(self, mock_query):
mock_query.return_value = {'error': '查詢範圍不能超過 180 天'}
resp = self.client.get(
'/api/tmtt-defect/analysis?start_date=2025-01-01&end_date=2025-12-31'
)
self.assertEqual(resp.status_code, 400)
data = resp.get_json()
self.assertIn('180', data['error'])
@patch('mes_dashboard.routes.tmtt_defect_routes.query_tmtt_defect_analysis')
def test_successful_query(self, mock_query):
mock_query.return_value = {
'kpi': {
'total_input': 1000, 'lot_count': 10,
'print_defect_qty': 5, 'print_defect_rate': 0.5,
'lead_defect_qty': 3, 'lead_defect_rate': 0.3,
},
'charts': {
'by_workflow': [], 'by_package': [], 'by_type': [],
'by_tmtt_machine': [], 'by_mold_machine': [],
},
'detail': [],
}
resp = self.client.get(
'/api/tmtt-defect/analysis?start_date=2025-01-01&end_date=2025-01-31'
)
self.assertEqual(resp.status_code, 200)
data = resp.get_json()
self.assertTrue(data['success'])
self.assertIn('kpi', data['data'])
self.assertIn('charts', data['data'])
self.assertIn('detail', data['data'])
# Verify separate defect rates
kpi = data['data']['kpi']
self.assertEqual(kpi['print_defect_qty'], 5)
self.assertEqual(kpi['lead_defect_qty'], 3)
@patch('mes_dashboard.routes.tmtt_defect_routes.query_tmtt_defect_analysis')
def test_query_failure_returns_500(self, mock_query):
mock_query.return_value = None
resp = self.client.get(
'/api/tmtt-defect/analysis?start_date=2025-01-01&end_date=2025-01-31'
)
self.assertEqual(resp.status_code, 500)
class TestTmttDefectExportEndpoint(unittest.TestCase):
"""Test GET /api/tmtt-defect/export endpoint."""
def setUp(self):
from mes_dashboard.core import database as db
db._ENGINE = None
from mes_dashboard.app import create_app
self.app = create_app()
self.client = self.app.test_client()
def test_missing_dates(self):
resp = self.client.get('/api/tmtt-defect/export')
self.assertEqual(resp.status_code, 400)
@patch('mes_dashboard.routes.tmtt_defect_routes.export_csv')
def test_export_csv(self, mock_export):
mock_export.return_value = iter([
'\ufeff',
'LOT ID,TYPE,PACKAGE,WORKFLOW,完工流水碼,TMTT設備,MOLD設備,'
'投入數,印字不良數,印字不良率(%),腳型不良數,腳型不良率(%)\r\n',
])
resp = self.client.get(
'/api/tmtt-defect/export?start_date=2025-01-01&end_date=2025-01-31'
)
self.assertEqual(resp.status_code, 200)
self.assertIn('text/csv', resp.content_type)
self.assertIn('attachment', resp.headers.get('Content-Disposition', ''))
class TestTmttDefectPageRoute(unittest.TestCase):
"""Test page route."""
def setUp(self):
from mes_dashboard.core import database as db
db._ENGINE = None
from mes_dashboard.app import create_app
self.app = create_app()
self.client = self.app.test_client()
def test_page_requires_auth_when_dev(self):
"""Page in 'dev' status returns 403 for unauthenticated users."""
resp = self.client.get('/tmtt-defect')
# 403 because page_status is 'dev' and user is not admin
self.assertIn(resp.status_code, [200, 403])
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,287 @@
# -*- coding: utf-8 -*-
"""Unit tests for TMTT Defect Analysis Service."""
import unittest
from unittest.mock import patch, MagicMock
import pandas as pd
from mes_dashboard.services.tmtt_defect_service import (
_build_kpi,
_build_chart_data,
_build_all_charts,
_build_detail_table,
_validate_date_range,
query_tmtt_defect_analysis,
PRINT_DEFECT,
LEAD_DEFECT,
)
def _make_df(rows):
"""Helper to create test DataFrame from list of dicts."""
cols = [
'CONTAINERID', 'CONTAINERNAME', 'PJ_TYPE', 'PRODUCTLINENAME',
'WORKFLOW', 'FINISHEDRUNCARD', 'TMTT_EQUIPMENTID',
'TMTT_EQUIPMENTNAME', 'TRACKINQTY', 'TRACKINTIMESTAMP',
'MOLD_EQUIPMENTID', 'MOLD_EQUIPMENTNAME',
'LOSSREASONNAME', 'REJECTQTY',
]
if not rows:
return pd.DataFrame(columns=cols)
df = pd.DataFrame(rows)
for c in cols:
if c not in df.columns:
df[c] = None
return df
class TestValidateDateRange(unittest.TestCase):
"""Test date range validation."""
def test_valid_range(self):
self.assertIsNone(_validate_date_range('2025-01-01', '2025-01-31'))
def test_invalid_format(self):
result = _validate_date_range('2025/01/01', '2025-01-31')
self.assertIn('格式', result)
def test_start_after_end(self):
result = _validate_date_range('2025-02-01', '2025-01-01')
self.assertIn('不能晚於', result)
def test_exceeds_max_days(self):
result = _validate_date_range('2025-01-01', '2025-12-31')
self.assertIn('180', result)
def test_exactly_max_days(self):
self.assertIsNone(_validate_date_range('2025-01-01', '2025-06-30'))
class TestBuildKpi(unittest.TestCase):
"""Test KPI calculation with separate defect rates."""
def test_empty_dataframe(self):
df = _make_df([])
kpi = _build_kpi(df)
self.assertEqual(kpi['total_input'], 0)
self.assertEqual(kpi['lot_count'], 0)
self.assertEqual(kpi['print_defect_qty'], 0)
self.assertEqual(kpi['lead_defect_qty'], 0)
self.assertEqual(kpi['print_defect_rate'], 0.0)
self.assertEqual(kpi['lead_defect_rate'], 0.0)
def test_single_lot_no_defects(self):
df = _make_df([{
'CONTAINERID': 'A001', 'TRACKINQTY': 100,
'LOSSREASONNAME': None, 'REJECTQTY': 0,
}])
kpi = _build_kpi(df)
self.assertEqual(kpi['total_input'], 100)
self.assertEqual(kpi['lot_count'], 1)
self.assertEqual(kpi['print_defect_qty'], 0)
self.assertEqual(kpi['lead_defect_qty'], 0)
def test_separate_defect_rates(self):
"""A LOT with both print and lead defects - rates calculated separately."""
df = _make_df([
{'CONTAINERID': 'A001', 'TRACKINQTY': 10000,
'LOSSREASONNAME': PRINT_DEFECT, 'REJECTQTY': 50},
{'CONTAINERID': 'A001', 'TRACKINQTY': 10000,
'LOSSREASONNAME': LEAD_DEFECT, 'REJECTQTY': 30},
])
kpi = _build_kpi(df)
# INPUT should be deduplicated (10000, not 20000)
self.assertEqual(kpi['total_input'], 10000)
self.assertEqual(kpi['lot_count'], 1)
self.assertEqual(kpi['print_defect_qty'], 50)
self.assertEqual(kpi['lead_defect_qty'], 30)
self.assertAlmostEqual(kpi['print_defect_rate'], 0.5, places=4)
self.assertAlmostEqual(kpi['lead_defect_rate'], 0.3, places=4)
def test_multiple_lots(self):
df = _make_df([
{'CONTAINERID': 'A001', 'TRACKINQTY': 100,
'LOSSREASONNAME': PRINT_DEFECT, 'REJECTQTY': 2},
{'CONTAINERID': 'A002', 'TRACKINQTY': 200,
'LOSSREASONNAME': LEAD_DEFECT, 'REJECTQTY': 1},
{'CONTAINERID': 'A003', 'TRACKINQTY': 300,
'LOSSREASONNAME': None, 'REJECTQTY': 0},
])
kpi = _build_kpi(df)
self.assertEqual(kpi['total_input'], 600)
self.assertEqual(kpi['lot_count'], 3)
self.assertEqual(kpi['print_defect_qty'], 2)
self.assertEqual(kpi['lead_defect_qty'], 1)
class TestBuildChartData(unittest.TestCase):
"""Test Pareto chart data aggregation."""
def test_empty_dataframe(self):
df = _make_df([])
result = _build_chart_data(df, 'PJ_TYPE')
self.assertEqual(result, [])
def test_single_dimension_value(self):
df = _make_df([
{'CONTAINERID': 'A001', 'TRACKINQTY': 100, 'PJ_TYPE': 'TypeA',
'LOSSREASONNAME': PRINT_DEFECT, 'REJECTQTY': 5},
{'CONTAINERID': 'A001', 'TRACKINQTY': 100, 'PJ_TYPE': 'TypeA',
'LOSSREASONNAME': LEAD_DEFECT, 'REJECTQTY': 3},
])
result = _build_chart_data(df, 'PJ_TYPE')
self.assertEqual(len(result), 1)
self.assertEqual(result[0]['name'], 'TypeA')
self.assertEqual(result[0]['print_defect_qty'], 5)
self.assertEqual(result[0]['lead_defect_qty'], 3)
self.assertEqual(result[0]['total_defect_qty'], 8)
self.assertAlmostEqual(result[0]['cumulative_pct'], 100.0)
def test_null_dimension_grouped_as_unknown(self):
df = _make_df([
{'CONTAINERID': 'A001', 'TRACKINQTY': 100, 'MOLD_EQUIPMENTNAME': None,
'LOSSREASONNAME': PRINT_DEFECT, 'REJECTQTY': 2},
])
result = _build_chart_data(df, 'MOLD_EQUIPMENTNAME')
self.assertEqual(len(result), 1)
self.assertEqual(result[0]['name'], '(未知)')
def test_sorted_by_total_defect_desc(self):
df = _make_df([
{'CONTAINERID': 'A001', 'TRACKINQTY': 100, 'PJ_TYPE': 'TypeA',
'LOSSREASONNAME': PRINT_DEFECT, 'REJECTQTY': 1},
{'CONTAINERID': 'A002', 'TRACKINQTY': 100, 'PJ_TYPE': 'TypeB',
'LOSSREASONNAME': PRINT_DEFECT, 'REJECTQTY': 10},
])
result = _build_chart_data(df, 'PJ_TYPE')
self.assertEqual(result[0]['name'], 'TypeB')
self.assertEqual(result[1]['name'], 'TypeA')
def test_cumulative_percentage(self):
df = _make_df([
{'CONTAINERID': 'A001', 'TRACKINQTY': 100, 'PJ_TYPE': 'TypeA',
'LOSSREASONNAME': PRINT_DEFECT, 'REJECTQTY': 6},
{'CONTAINERID': 'A002', 'TRACKINQTY': 100, 'PJ_TYPE': 'TypeB',
'LOSSREASONNAME': PRINT_DEFECT, 'REJECTQTY': 4},
])
result = _build_chart_data(df, 'PJ_TYPE')
# TypeA: 6/10 = 60%, TypeB: cumulative 10/10 = 100%
self.assertAlmostEqual(result[0]['cumulative_pct'], 60.0)
self.assertAlmostEqual(result[1]['cumulative_pct'], 100.0)
class TestBuildAllCharts(unittest.TestCase):
"""Test all 5 chart dimensions are built."""
def test_returns_all_dimensions(self):
df = _make_df([{
'CONTAINERID': 'A001', 'TRACKINQTY': 100,
'WORKFLOW': 'WF1', 'PRODUCTLINENAME': 'PKG1',
'PJ_TYPE': 'T1', 'TMTT_EQUIPMENTNAME': 'TMTT-1',
'MOLD_EQUIPMENTNAME': 'MOLD-1',
'LOSSREASONNAME': PRINT_DEFECT, 'REJECTQTY': 1,
}])
charts = _build_all_charts(df)
self.assertIn('by_workflow', charts)
self.assertIn('by_package', charts)
self.assertIn('by_type', charts)
self.assertIn('by_tmtt_machine', charts)
self.assertIn('by_mold_machine', charts)
class TestBuildDetailTable(unittest.TestCase):
"""Test detail table building."""
def test_empty_dataframe(self):
df = _make_df([])
result = _build_detail_table(df)
self.assertEqual(result, [])
def test_single_lot_aggregated(self):
"""LOT with both defect types should produce one row."""
df = _make_df([
{'CONTAINERID': 'A001', 'CONTAINERNAME': 'LOT-001',
'TRACKINQTY': 100, 'PJ_TYPE': 'T1', 'PRODUCTLINENAME': 'P1',
'WORKFLOW': 'WF1', 'FINISHEDRUNCARD': 'RC001',
'TMTT_EQUIPMENTNAME': 'TMTT-1', 'MOLD_EQUIPMENTNAME': 'MOLD-1',
'LOSSREASONNAME': PRINT_DEFECT, 'REJECTQTY': 5},
{'CONTAINERID': 'A001', 'CONTAINERNAME': 'LOT-001',
'TRACKINQTY': 100, 'PJ_TYPE': 'T1', 'PRODUCTLINENAME': 'P1',
'WORKFLOW': 'WF1', 'FINISHEDRUNCARD': 'RC001',
'TMTT_EQUIPMENTNAME': 'TMTT-1', 'MOLD_EQUIPMENTNAME': 'MOLD-1',
'LOSSREASONNAME': LEAD_DEFECT, 'REJECTQTY': 3},
])
result = _build_detail_table(df)
self.assertEqual(len(result), 1)
row = result[0]
self.assertEqual(row['CONTAINERNAME'], 'LOT-001')
self.assertEqual(row['INPUT_QTY'], 100)
self.assertEqual(row['PRINT_DEFECT_QTY'], 5)
self.assertEqual(row['LEAD_DEFECT_QTY'], 3)
self.assertAlmostEqual(row['PRINT_DEFECT_RATE'], 5.0, places=4)
self.assertAlmostEqual(row['LEAD_DEFECT_RATE'], 3.0, places=4)
def test_lot_with_no_defects(self):
df = _make_df([{
'CONTAINERID': 'A001', 'CONTAINERNAME': 'LOT-001',
'TRACKINQTY': 100, 'PJ_TYPE': 'T1',
'LOSSREASONNAME': None, 'REJECTQTY': 0,
}])
result = _build_detail_table(df)
self.assertEqual(len(result), 1)
self.assertEqual(result[0]['PRINT_DEFECT_QTY'], 0)
self.assertEqual(result[0]['LEAD_DEFECT_QTY'], 0)
class TestQueryTmttDefectAnalysis(unittest.TestCase):
"""Test the main entry point function."""
def setUp(self):
from mes_dashboard.core import database as db
db._ENGINE = None
@patch('mes_dashboard.services.tmtt_defect_service.cache_get', return_value=None)
@patch('mes_dashboard.services.tmtt_defect_service.cache_set')
@patch('mes_dashboard.services.tmtt_defect_service._fetch_base_data')
def test_valid_query(self, mock_fetch, mock_cache_set, mock_cache_get):
mock_fetch.return_value = _make_df([{
'CONTAINERID': 'A001', 'CONTAINERNAME': 'LOT-001',
'TRACKINQTY': 100, 'PJ_TYPE': 'T1', 'PRODUCTLINENAME': 'P1',
'WORKFLOW': 'WF1', 'FINISHEDRUNCARD': 'RC001',
'TMTT_EQUIPMENTNAME': 'TMTT-1', 'MOLD_EQUIPMENTNAME': 'MOLD-1',
'LOSSREASONNAME': PRINT_DEFECT, 'REJECTQTY': 2,
}])
result = query_tmtt_defect_analysis('2025-01-01', '2025-01-31')
self.assertIn('kpi', result)
self.assertIn('charts', result)
self.assertIn('detail', result)
self.assertNotIn('error', result)
mock_cache_set.assert_called_once()
def test_invalid_dates(self):
result = query_tmtt_defect_analysis('invalid', '2025-01-31')
self.assertIn('error', result)
def test_exceeds_max_days(self):
result = query_tmtt_defect_analysis('2025-01-01', '2025-12-31')
self.assertIn('error', result)
self.assertIn('180', result['error'])
@patch('mes_dashboard.services.tmtt_defect_service.cache_get')
def test_cache_hit(self, mock_cache_get):
cached_data = {'kpi': {}, 'charts': {}, 'detail': []}
mock_cache_get.return_value = cached_data
result = query_tmtt_defect_analysis('2025-01-01', '2025-01-31')
self.assertEqual(result, cached_data)
@patch('mes_dashboard.services.tmtt_defect_service.cache_get', return_value=None)
@patch('mes_dashboard.services.tmtt_defect_service._fetch_base_data', return_value=None)
def test_query_failure(self, mock_fetch, mock_cache_get):
result = query_tmtt_defect_analysis('2025-01-01', '2025-01-31')
self.assertIsNone(result)
if __name__ == '__main__':
unittest.main()

View File

@@ -22,14 +22,20 @@ from mes_dashboard.services.wip_service import (
)
def disable_cache(func):
"""Decorator to disable Redis cache for Oracle fallback tests."""
@wraps(func)
def wrapper(*args, **kwargs):
with patch('mes_dashboard.services.wip_service.get_cached_wip_data', return_value=None):
with patch('mes_dashboard.services.wip_service.get_cached_sys_date', return_value=None):
return func(*args, **kwargs)
return wrapper
def disable_cache(func):
"""Decorator to disable Redis cache for Oracle fallback tests."""
@wraps(func)
def wrapper(*args, **kwargs):
import mes_dashboard.services.wip_service as wip_service
with wip_service._wip_search_index_lock:
wip_service._wip_search_index_cache.clear()
with wip_service._wip_snapshot_lock:
wip_service._wip_snapshot_cache.clear()
with patch('mes_dashboard.services.wip_service.get_cached_wip_data', return_value=None):
with patch('mes_dashboard.services.wip_service.get_cached_sys_date', return_value=None):
return func(*args, **kwargs)
return wrapper
class TestWipServiceConfig(unittest.TestCase):