chore: reinitialize project with vite architecture

This commit is contained in:
beabigegg
2026-02-08 08:30:48 +08:00
commit b56e80381b
264 changed files with 75752 additions and 0 deletions

50
tests/e2e/conftest.py Normal file
View File

@@ -0,0 +1,50 @@
# -*- coding: utf-8 -*-
"""Pytest configuration for Playwright E2E tests."""
import pytest
import os
import sys
# Add src to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', 'src'))
@pytest.fixture(scope="session")
def app_server() -> str:
"""Get the base URL for E2E testing.
Uses environment variable E2E_BASE_URL or defaults to production server.
"""
return os.environ.get('E2E_BASE_URL', 'http://127.0.0.1:8080')
@pytest.fixture(scope="session")
def browser_context_args(browser_context_args):
"""Configure browser context for tests."""
return {
**browser_context_args,
"viewport": {"width": 1280, "height": 720},
"locale": "zh-TW",
}
def pytest_configure(config):
"""Add custom markers for E2E tests."""
config.addinivalue_line(
"markers", "e2e: mark test as end-to-end test (requires running server)"
)
config.addinivalue_line(
"markers", "redis: mark test as requiring Redis connection"
)
@pytest.fixture(scope="session")
def api_base_url(app_server):
"""Get the API base URL."""
return f"{app_server}/api"
@pytest.fixture(scope="session")
def health_url(app_server):
"""Get the health check URL."""
return f"{app_server}/health"

View File

@@ -0,0 +1,350 @@
# -*- coding: utf-8 -*-
"""End-to-end tests for admin authentication flow.
These tests simulate real user workflows through the admin authentication system.
Run with: pytest tests/e2e/test_admin_auth_e2e.py -v --run-integration
"""
import json
import pytest
from unittest.mock import patch, MagicMock
import tempfile
from pathlib import Path
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', 'src'))
import mes_dashboard.core.database as db
from mes_dashboard.app import create_app
from mes_dashboard.services import page_registry
@pytest.fixture
def temp_page_status(tmp_path):
"""Create temporary page status file."""
data_file = tmp_path / "page_status.json"
initial_data = {
"pages": [
{"route": "/", "name": "首頁", "status": "released"},
{"route": "/wip-overview", "name": "WIP 即時概況", "status": "released"},
{"route": "/wip-detail", "name": "WIP 明細", "status": "released"},
{"route": "/tables", "name": "表格總覽", "status": "dev"},
{"route": "/resource", "name": "機台狀態", "status": "dev"},
],
"api_public": True
}
data_file.write_text(json.dumps(initial_data, ensure_ascii=False), encoding="utf-8")
return data_file
@pytest.fixture
def app(temp_page_status):
"""Create application for testing."""
db._ENGINE = None
# Mock page registry
original_data_file = page_registry.DATA_FILE
original_cache = page_registry._cache
page_registry.DATA_FILE = temp_page_status
page_registry._cache = None
app = create_app('testing')
app.config['TESTING'] = True
app.config['WTF_CSRF_ENABLED'] = False
yield app
page_registry.DATA_FILE = original_data_file
page_registry._cache = original_cache
@pytest.fixture
def client(app):
"""Create test client."""
return app.test_client()
def mock_ldap_success(mail="ymirliu@panjit.com.tw"):
"""Helper to create mock for successful LDAP auth."""
mock_response = MagicMock()
mock_response.json.return_value = {
"success": True,
"user": {
"username": "92367",
"displayName": "Test Admin",
"mail": mail,
"department": "Test Department"
}
}
return mock_response
class TestFullLoginLogoutFlow:
"""E2E tests for complete login/logout flow."""
@patch('mes_dashboard.services.auth_service.requests.post')
def test_complete_admin_login_workflow(self, mock_post, client):
"""Test complete admin login workflow."""
mock_post.return_value = mock_ldap_success()
# 1. Access portal - should see login link
response = client.get("/")
assert response.status_code == 200
content = response.data.decode("utf-8")
assert "管理員登入" in content
# 2. Go to login page
response = client.get("/admin/login")
assert response.status_code == 200
# 3. Submit login form
response = client.post("/admin/login", data={
"username": "92367",
"password": "password123"
}, follow_redirects=True)
assert response.status_code == 200
content = response.data.decode("utf-8")
# Should see admin name and logout option
assert "Test Admin" in content or "登出" in content
# 4. Verify session has admin
with client.session_transaction() as sess:
assert "admin" in sess
assert sess["admin"]["mail"] == "ymirliu@panjit.com.tw"
# 5. Access admin pages
response = client.get("/admin/pages")
assert response.status_code == 200
# 6. Logout
response = client.get("/admin/logout", follow_redirects=True)
assert response.status_code == 200
# 7. Verify logged out
with client.session_transaction() as sess:
assert "admin" not in sess
# 8. Admin pages should redirect now
response = client.get("/admin/pages", follow_redirects=False)
assert response.status_code == 302
class TestPageAccessControlFlow:
"""E2E tests for page access control flow."""
def test_non_admin_cannot_access_dev_pages(self, client, temp_page_status):
"""Test non-admin users cannot access dev pages."""
# 1. Access released page - should work
response = client.get("/wip-overview")
assert response.status_code != 403
# 2. Access dev page - should get 403
response = client.get("/tables")
assert response.status_code == 403
content = response.data.decode("utf-8")
assert "開發中" in content or "403" in content
@patch('mes_dashboard.services.auth_service.requests.post')
def test_admin_can_access_all_pages(self, mock_post, client, temp_page_status):
"""Test admin users can access all pages."""
mock_post.return_value = mock_ldap_success()
# 1. Login as admin
client.post("/admin/login", data={
"username": "92367",
"password": "password123"
})
# 2. Access released page - should work
response = client.get("/wip-overview")
assert response.status_code != 403
# 3. Access dev page - should work for admin
response = client.get("/tables")
assert response.status_code != 403
class TestPageManagementFlow:
"""E2E tests for page management flow."""
@patch('mes_dashboard.services.auth_service.requests.post')
def test_admin_can_change_page_status(self, mock_post, client, temp_page_status):
"""Test admin can change page status via management interface."""
mock_post.return_value = mock_ldap_success()
# 1. Login as admin
client.post("/admin/login", data={
"username": "92367",
"password": "password123"
})
# 2. Get current pages list
response = client.get("/admin/api/pages")
assert response.status_code == 200
data = json.loads(response.data)
assert data["success"] is True
# 3. Change /wip-overview from released to dev
response = client.put(
"/admin/api/pages/wip-overview",
data=json.dumps({"status": "dev"}),
content_type="application/json"
)
assert response.status_code == 200
# 4. Verify change persisted
page_registry._cache = None
status = page_registry.get_page_status("/wip-overview")
assert status == "dev"
# 5. Logout
client.get("/admin/logout")
# 6. Now non-admin should get 403 on /wip-overview
response = client.get("/wip-overview")
assert response.status_code == 403
@patch('mes_dashboard.services.auth_service.requests.post')
def test_release_dev_page_makes_it_public(self, mock_post, client, temp_page_status):
"""Test releasing a dev page makes it publicly accessible."""
mock_post.return_value = mock_ldap_success()
# 1. Verify /tables is currently dev (403 for non-admin)
response = client.get("/tables")
assert response.status_code == 403
# 2. Login as admin
client.post("/admin/login", data={
"username": "92367",
"password": "password123"
})
# 3. Release the page
response = client.put(
"/admin/api/pages/tables",
data=json.dumps({"status": "released"}),
content_type="application/json"
)
assert response.status_code == 200
# 4. Logout
client.get("/admin/logout")
# 5. Clear cache and verify non-admin can access
page_registry._cache = None
response = client.get("/tables")
assert response.status_code != 403
class TestPortalDynamicTabs:
"""E2E tests for dynamic portal tabs based on page status."""
def test_portal_hides_dev_tabs_for_non_admin(self, client, temp_page_status):
"""Test portal hides dev page tabs for non-admin users."""
response = client.get("/")
assert response.status_code == 200
content = response.data.decode("utf-8")
# Released pages should show
assert "WIP 即時概況" in content
# Dev pages should NOT show (tables and resource are dev)
# Note: This depends on the can_view_page implementation in portal.html
@patch('mes_dashboard.services.auth_service.requests.post')
def test_portal_shows_all_tabs_for_admin(self, mock_post, client, temp_page_status):
"""Test portal shows all tabs for admin users."""
mock_post.return_value = mock_ldap_success()
# Login as admin
client.post("/admin/login", data={
"username": "92367",
"password": "password123"
})
response = client.get("/")
assert response.status_code == 200
content = response.data.decode("utf-8")
# Admin should see all pages
assert "WIP 即時概況" in content
class TestSessionPersistence:
"""E2E tests for session persistence."""
@patch('mes_dashboard.services.auth_service.requests.post')
def test_session_persists_across_requests(self, mock_post, client):
"""Test admin session persists across multiple requests."""
mock_post.return_value = mock_ldap_success()
# Login
client.post("/admin/login", data={
"username": "92367",
"password": "password123"
})
# Make multiple requests
for _ in range(5):
response = client.get("/admin/pages")
assert response.status_code == 200
# Session should still be valid
with client.session_transaction() as sess:
assert "admin" in sess
class TestSecurityScenarios:
"""E2E tests for security scenarios."""
def test_cannot_access_admin_api_without_login(self, client):
"""Test admin APIs are protected."""
# Try to get pages without login
response = client.get("/admin/api/pages", follow_redirects=False)
assert response.status_code == 302
# Try to update page without login
response = client.put(
"/admin/api/pages/wip-overview",
data=json.dumps({"status": "dev"}),
content_type="application/json",
follow_redirects=False
)
assert response.status_code == 302
@patch('mes_dashboard.services.auth_service.requests.post')
def test_non_admin_user_cannot_login(self, mock_post, client):
"""Test non-admin user cannot access admin features."""
# Mock LDAP success but with non-admin email
mock_response = MagicMock()
mock_response.json.return_value = {
"success": True,
"user": {
"username": "99999",
"displayName": "Regular User",
"mail": "regular@panjit.com.tw",
"department": "Test"
}
}
mock_post.return_value = mock_response
# Try to login
response = client.post("/admin/login", data={
"username": "99999",
"password": "password123"
})
# Should fail (show error, not redirect)
assert response.status_code == 200
content = response.data.decode("utf-8")
assert "管理員" in content or "error" in content.lower()
# Should NOT have admin session
with client.session_transaction() as sess:
assert "admin" not in sess
if __name__ == "__main__":
pytest.main([__file__, "-v"])

281
tests/e2e/test_cache_e2e.py Normal file
View File

@@ -0,0 +1,281 @@
# -*- coding: utf-8 -*-
"""End-to-end tests for Redis cache functionality.
These tests require a running server with Redis enabled.
Run with: pytest tests/e2e/test_cache_e2e.py -v
"""
import pytest
import requests
import time
@pytest.mark.e2e
class TestHealthEndpointE2E:
"""E2E tests for /health endpoint."""
def test_health_endpoint_accessible(self, health_url):
"""Test health endpoint is accessible."""
response = requests.get(health_url, timeout=10)
assert response.status_code in [200, 503]
data = response.json()
assert 'status' in data
assert 'services' in data
assert 'cache' in data
def test_health_shows_database_status(self, health_url):
"""Test health endpoint shows database status."""
response = requests.get(health_url, timeout=10)
data = response.json()
assert 'database' in data['services']
assert data['services']['database'] in ['ok', 'error']
def test_health_shows_redis_status(self, health_url):
"""Test health endpoint shows Redis status."""
response = requests.get(health_url, timeout=10)
data = response.json()
assert 'redis' in data['services']
assert data['services']['redis'] in ['ok', 'error', 'disabled']
def test_health_shows_cache_info(self, health_url):
"""Test health endpoint shows cache information."""
response = requests.get(health_url, timeout=10)
data = response.json()
assert 'cache' in data
assert 'enabled' in data['cache']
assert 'sys_date' in data['cache']
assert 'updated_at' in data['cache']
@pytest.mark.e2e
@pytest.mark.redis
class TestCachedWipApiE2E:
"""E2E tests for cached WIP API endpoints."""
def _unwrap(self, resp_json):
"""Unwrap API response to get data."""
if isinstance(resp_json, dict) and 'data' in resp_json:
return resp_json['data']
return resp_json
def test_wip_summary_returns_data(self, api_base_url):
"""Test WIP summary endpoint returns valid data."""
response = requests.get(f"{api_base_url}/wip/overview/summary", timeout=30)
assert response.status_code == 200
data = self._unwrap(response.json())
assert 'totalLots' in data
assert 'totalQtyPcs' in data
assert 'byWipStatus' in data
assert 'dataUpdateDate' in data
def test_wip_summary_status_breakdown(self, api_base_url):
"""Test WIP summary contains correct status breakdown."""
response = requests.get(f"{api_base_url}/wip/overview/summary", timeout=30)
data = self._unwrap(response.json())
by_status = data['byWipStatus']
assert 'run' in by_status
assert 'queue' in by_status
assert 'hold' in by_status
assert 'qualityHold' in by_status
assert 'nonQualityHold' in by_status
# Each status should have lots and qtyPcs
for status in ['run', 'queue', 'hold']:
assert 'lots' in by_status[status]
assert 'qtyPcs' in by_status[status]
def test_wip_matrix_returns_data(self, api_base_url):
"""Test WIP matrix endpoint returns valid data."""
response = requests.get(f"{api_base_url}/wip/overview/matrix", timeout=30)
assert response.status_code == 200
data = self._unwrap(response.json())
assert 'workcenters' in data
assert 'packages' in data
assert 'matrix' in data
assert 'workcenter_totals' in data
assert 'package_totals' in data
assert 'grand_total' in data
def test_wip_workcenters_returns_list(self, api_base_url):
"""Test workcenters endpoint returns list."""
response = requests.get(f"{api_base_url}/wip/meta/workcenters", timeout=30)
assert response.status_code == 200
data = self._unwrap(response.json())
assert isinstance(data, list)
if len(data) > 0:
assert 'name' in data[0]
assert 'lot_count' in data[0]
def test_wip_packages_returns_list(self, api_base_url):
"""Test packages endpoint returns list."""
response = requests.get(f"{api_base_url}/wip/meta/packages", timeout=30)
assert response.status_code == 200
data = self._unwrap(response.json())
assert isinstance(data, list)
if len(data) > 0:
assert 'name' in data[0]
assert 'lot_count' in data[0]
def test_wip_hold_summary_returns_data(self, api_base_url):
"""Test hold summary endpoint returns valid data."""
response = requests.get(f"{api_base_url}/wip/overview/hold", timeout=30)
assert response.status_code == 200
data = self._unwrap(response.json())
assert 'items' in data
assert isinstance(data['items'], list)
@pytest.mark.e2e
@pytest.mark.redis
class TestCachePerformanceE2E:
"""E2E tests for cache performance."""
def _unwrap(self, resp_json):
"""Unwrap API response to get data."""
if isinstance(resp_json, dict) and 'data' in resp_json:
return resp_json['data']
return resp_json
def test_cached_response_is_fast(self, api_base_url):
"""Test cached responses are faster than 2 seconds."""
# First request may load cache
requests.get(f"{api_base_url}/wip/overview/summary", timeout=30)
# Second request should be from cache
start = time.time()
response = requests.get(f"{api_base_url}/wip/overview/summary", timeout=30)
elapsed = time.time() - start
assert response.status_code == 200
# Cached response should be fast (< 2 seconds)
assert elapsed < 2.0, f"Response took {elapsed:.2f}s, expected < 2s"
def test_multiple_endpoints_consistent(self, api_base_url):
"""Test multiple endpoints return consistent data."""
# Get summary
summary_resp = requests.get(f"{api_base_url}/wip/overview/summary", timeout=30)
summary = self._unwrap(summary_resp.json())
# Get matrix
matrix_resp = requests.get(f"{api_base_url}/wip/overview/matrix", timeout=30)
matrix = self._unwrap(matrix_resp.json())
# Grand total from matrix should match total from summary (approximately)
# There may be slight differences due to filtering
if summary['totalLots'] > 0 and matrix['grand_total'] > 0:
assert summary['totalQtyPcs'] > 0 or matrix['grand_total'] > 0
@pytest.mark.e2e
@pytest.mark.redis
class TestSearchEndpointsE2E:
"""E2E tests for search endpoints with cache."""
def _unwrap(self, resp_json):
"""Unwrap API response to get data."""
if isinstance(resp_json, dict) and 'data' in resp_json:
data = resp_json['data']
# Search returns {'items': [...]}
if isinstance(data, dict) and 'items' in data:
return data['items']
return data
return resp_json
def test_search_workorders(self, api_base_url):
"""Test workorder search returns results."""
# Use a common pattern that should exist
response = requests.get(
f"{api_base_url}/wip/meta/search",
params={'type': 'workorder', 'q': 'WO', 'limit': 10},
timeout=30
)
assert response.status_code == 200
data = self._unwrap(response.json())
assert isinstance(data, list)
def test_search_lotids(self, api_base_url):
"""Test lot ID search returns results."""
response = requests.get(
f"{api_base_url}/wip/meta/search",
params={'type': 'lotid', 'q': 'LOT', 'limit': 10},
timeout=30
)
assert response.status_code == 200
data = self._unwrap(response.json())
assert isinstance(data, list)
def test_search_with_short_query_returns_empty(self, api_base_url):
"""Test search with short query returns empty list."""
response = requests.get(
f"{api_base_url}/wip/meta/search",
params={'type': 'workorder', 'q': 'W'}, # Too short
timeout=30
)
assert response.status_code == 200
data = self._unwrap(response.json())
assert data == []
@pytest.mark.e2e
@pytest.mark.redis
class TestWipDetailE2E:
"""E2E tests for WIP detail endpoint with cache."""
def _unwrap(self, resp_json):
"""Unwrap API response to get data."""
if isinstance(resp_json, dict) and 'data' in resp_json:
return resp_json['data']
return resp_json
def test_wip_detail_with_workcenter(self, api_base_url):
"""Test WIP detail endpoint for a workcenter."""
# First get list of workcenters
wc_resp = requests.get(f"{api_base_url}/wip/meta/workcenters", timeout=30)
workcenters = self._unwrap(wc_resp.json())
if len(workcenters) > 0:
wc_name = workcenters[0]['name']
response = requests.get(
f"{api_base_url}/wip/detail/{wc_name}",
timeout=30
)
assert response.status_code == 200
data = self._unwrap(response.json())
assert 'workcenter' in data
assert 'summary' in data
assert 'lots' in data
assert 'pagination' in data
def test_wip_detail_pagination(self, api_base_url):
"""Test WIP detail pagination."""
wc_resp = requests.get(f"{api_base_url}/wip/meta/workcenters", timeout=30)
workcenters = self._unwrap(wc_resp.json())
if len(workcenters) > 0:
wc_name = workcenters[0]['name']
response = requests.get(
f"{api_base_url}/wip/detail/{wc_name}",
params={'page': 1, 'page_size': 10},
timeout=30
)
assert response.status_code == 200
data = self._unwrap(response.json())
assert data['pagination']['page'] == 1
assert data['pagination']['page_size'] == 10

View File

@@ -0,0 +1,362 @@
# -*- coding: utf-8 -*-
"""E2E tests for global connection management features.
Tests the MesApi client, Toast notifications, and page functionality
using Playwright.
Run with: pytest tests/e2e/ --headed (to see browser)
"""
import pytest
import re
from playwright.sync_api import Page, expect
@pytest.mark.e2e
class TestPortalPage:
"""E2E tests for the Portal page."""
def test_portal_loads_successfully(self, page: Page, app_server: str):
"""Portal page should load without errors."""
page.goto(app_server)
# Wait for page to load
expect(page.locator('h1')).to_contain_text('MES 報表入口')
def test_portal_has_all_tabs(self, page: Page, app_server: str):
"""Portal should have all navigation tabs."""
page.goto(app_server)
# Check all tabs exist
expect(page.locator('.tab:has-text("WIP 即時概況")')).to_be_visible()
expect(page.locator('.tab:has-text("機台狀態報表")')).to_be_visible()
expect(page.locator('.tab:has-text("數據表查詢工具")')).to_be_visible()
expect(page.locator('.tab:has-text("Excel 批次查詢")')).to_be_visible()
def test_portal_tab_switching(self, page: Page, app_server: str):
"""Portal tabs should switch iframe content."""
page.goto(app_server)
# Click on a different tab
page.locator('.tab:has-text("機台狀態報表")').click()
# Verify the tab is active
expect(page.locator('.tab:has-text("機台狀態報表")')).to_have_class(re.compile(r'active'))
@pytest.mark.e2e
class TestToastNotifications:
"""E2E tests for Toast notification system."""
def test_toast_container_exists(self, page: Page, app_server: str):
"""Toast container should be present in the DOM."""
page.goto(f"{app_server}/wip-overview")
# Toast container should exist in DOM (hidden when empty, which is expected)
page.wait_for_selector('#mes-toast-container', state='attached', timeout=5000)
def test_toast_info_display(self, page: Page, app_server: str):
"""Toast.info() should display info notification."""
page.goto(f"{app_server}/wip-overview")
# Execute Toast.info() in browser context
page.evaluate("Toast.info('Test info message')")
# Verify toast appears
toast = page.locator('.mes-toast-info')
expect(toast).to_be_visible()
expect(toast).to_contain_text('Test info message')
def test_toast_success_display(self, page: Page, app_server: str):
"""Toast.success() should display success notification."""
page.goto(f"{app_server}/wip-overview")
page.evaluate("Toast.success('Operation successful')")
toast = page.locator('.mes-toast-success')
expect(toast).to_be_visible()
expect(toast).to_contain_text('Operation successful')
def test_toast_error_display(self, page: Page, app_server: str):
"""Toast.error() should display error notification."""
page.goto(f"{app_server}/wip-overview")
page.evaluate("Toast.error('An error occurred')")
toast = page.locator('.mes-toast-error')
expect(toast).to_be_visible()
expect(toast).to_contain_text('An error occurred')
def test_toast_error_with_retry(self, page: Page, app_server: str):
"""Toast.error() with retry callback should show retry button."""
page.goto(f"{app_server}/wip-overview")
page.evaluate("Toast.error('Connection failed', { retry: () => console.log('retry clicked') })")
# Verify retry button exists
retry_btn = page.locator('.mes-toast-retry')
expect(retry_btn).to_be_visible()
expect(retry_btn).to_contain_text('重試')
def test_toast_loading_display(self, page: Page, app_server: str):
"""Toast.loading() should display loading notification."""
page.goto(f"{app_server}/wip-overview")
page.evaluate("Toast.loading('Loading data...')")
toast = page.locator('.mes-toast-loading')
expect(toast).to_be_visible()
def test_toast_dismiss(self, page: Page, app_server: str):
"""Toast.dismiss() should remove toast."""
page.goto(f"{app_server}/wip-overview")
# Create and dismiss a toast
toast_id = page.evaluate("Toast.info('Will be dismissed')")
page.evaluate(f"Toast.dismiss({toast_id})")
# Wait for animation
page.wait_for_timeout(500)
# Toast should be gone
expect(page.locator('.mes-toast-info')).not_to_be_visible()
def test_toast_max_limit(self, page: Page, app_server: str):
"""Toast system should enforce max 5 toasts."""
page.goto(f"{app_server}/wip-overview")
# Create 7 toasts
for i in range(7):
page.evaluate(f"Toast.info('Toast {i}')")
# Should only have 5 toasts visible
toasts = page.locator('.mes-toast')
expect(toasts).to_have_count(5)
@pytest.mark.e2e
class TestMesApiClient:
"""E2E tests for MesApi client."""
def test_mesapi_exists_on_page(self, page: Page, app_server: str):
"""MesApi should be available in window scope."""
page.goto(f"{app_server}/wip-overview")
has_mesapi = page.evaluate("typeof MesApi !== 'undefined'")
assert has_mesapi, "MesApi should be defined"
def test_mesapi_has_get_method(self, page: Page, app_server: str):
"""MesApi should have get() method."""
page.goto(f"{app_server}/wip-overview")
has_get = page.evaluate("typeof MesApi.get === 'function'")
assert has_get, "MesApi.get should be a function"
def test_mesapi_has_post_method(self, page: Page, app_server: str):
"""MesApi should have post() method."""
page.goto(f"{app_server}/wip-overview")
has_post = page.evaluate("typeof MesApi.post === 'function'")
assert has_post, "MesApi.post should be a function"
def test_mesapi_request_logging(self, page: Page, app_server: str):
"""MesApi should log requests to console."""
page.goto(f"{app_server}/wip-overview")
# Capture console messages
console_messages = []
page.on("console", lambda msg: console_messages.append(msg.text))
# Make a request (will fail but should log)
page.evaluate("""
(async () => {
try {
await MesApi.get('/api/test-endpoint');
} catch (e) {
// Expected to fail
}
})()
""")
page.wait_for_timeout(1000)
# Check for MesApi log pattern
mesapi_logs = [m for m in console_messages if '[MesApi]' in m]
assert len(mesapi_logs) > 0, "MesApi should log requests with [MesApi] prefix"
@pytest.mark.e2e
class TestWIPOverviewPage:
"""E2E tests for WIP Overview page."""
def test_wip_overview_loads(self, page: Page, app_server: str):
"""WIP Overview page should load."""
page.goto(f"{app_server}/wip-overview")
# Page should have the header
expect(page.locator('body')).to_be_visible()
def test_wip_overview_has_toast_system(self, page: Page, app_server: str):
"""WIP Overview should have Toast system loaded."""
page.goto(f"{app_server}/wip-overview")
has_toast = page.evaluate("typeof Toast !== 'undefined'")
assert has_toast, "Toast should be defined on WIP Overview page"
def test_wip_overview_has_mesapi(self, page: Page, app_server: str):
"""WIP Overview should have MesApi loaded."""
page.goto(f"{app_server}/wip-overview")
has_mesapi = page.evaluate("typeof MesApi !== 'undefined'")
assert has_mesapi, "MesApi should be defined on WIP Overview page"
@pytest.mark.e2e
class TestWIPDetailPage:
"""E2E tests for WIP Detail page."""
def test_wip_detail_loads(self, page: Page, app_server: str):
"""WIP Detail page should load."""
page.goto(f"{app_server}/wip-detail")
expect(page.locator('body')).to_be_visible()
def test_wip_detail_has_toast_system(self, page: Page, app_server: str):
"""WIP Detail should have Toast system loaded."""
page.goto(f"{app_server}/wip-detail")
has_toast = page.evaluate("typeof Toast !== 'undefined'")
assert has_toast, "Toast should be defined on WIP Detail page"
def test_wip_detail_has_mesapi(self, page: Page, app_server: str):
"""WIP Detail should have MesApi loaded."""
page.goto(f"{app_server}/wip-detail")
has_mesapi = page.evaluate("typeof MesApi !== 'undefined'")
assert has_mesapi, "MesApi should be defined on WIP Detail page"
@pytest.mark.e2e
class TestTablesPage:
"""E2E tests for Tables page."""
def test_tables_page_loads(self, page: Page, app_server: str):
"""Tables page should load."""
page.goto(f"{app_server}/tables")
expect(page.locator('h1')).to_contain_text('MES 數據表查詢工具')
def test_tables_has_toast_system(self, page: Page, app_server: str):
"""Tables page should have Toast system loaded."""
page.goto(f"{app_server}/tables")
has_toast = page.evaluate("typeof Toast !== 'undefined'")
assert has_toast, "Toast should be defined on Tables page"
def test_tables_has_mesapi(self, page: Page, app_server: str):
"""Tables page should have MesApi loaded."""
page.goto(f"{app_server}/tables")
has_mesapi = page.evaluate("typeof MesApi !== 'undefined'")
assert has_mesapi, "MesApi should be defined on Tables page"
@pytest.mark.e2e
class TestResourcePage:
"""E2E tests for Resource Status page."""
def test_resource_page_loads(self, page: Page, app_server: str):
"""Resource page should load."""
page.goto(f"{app_server}/resource")
expect(page.locator('body')).to_be_visible()
def test_resource_has_toast_system(self, page: Page, app_server: str):
"""Resource page should have Toast system loaded."""
page.goto(f"{app_server}/resource")
has_toast = page.evaluate("typeof Toast !== 'undefined'")
assert has_toast, "Toast should be defined on Resource page"
def test_resource_has_mesapi(self, page: Page, app_server: str):
"""Resource page should have MesApi loaded."""
page.goto(f"{app_server}/resource")
has_mesapi = page.evaluate("typeof MesApi !== 'undefined'")
assert has_mesapi, "MesApi should be defined on Resource page"
@pytest.mark.e2e
class TestExcelQueryPage:
"""E2E tests for Excel Query page."""
def test_excel_query_page_loads(self, page: Page, app_server: str):
"""Excel Query page should load."""
page.goto(f"{app_server}/excel-query")
expect(page.locator('body')).to_be_visible()
def test_excel_query_has_toast_system(self, page: Page, app_server: str):
"""Excel Query page should have Toast system loaded."""
page.goto(f"{app_server}/excel-query")
has_toast = page.evaluate("typeof Toast !== 'undefined'")
assert has_toast, "Toast should be defined on Excel Query page"
def test_excel_query_has_mesapi(self, page: Page, app_server: str):
"""Excel Query page should have MesApi loaded."""
page.goto(f"{app_server}/excel-query")
has_mesapi = page.evaluate("typeof MesApi !== 'undefined'")
assert has_mesapi, "MesApi should be defined on Excel Query page"
@pytest.mark.e2e
class TestConsoleLogVerification:
"""E2E tests for console log verification (Phase 4.2 tasks)."""
def test_request_has_request_id(self, page: Page, app_server: str):
"""API requests should log with req_xxx ID format."""
page.goto(f"{app_server}/wip-overview")
console_messages = []
page.on("console", lambda msg: console_messages.append(msg.text))
# Trigger an API request
page.evaluate("""
(async () => {
try {
await MesApi.get('/api/wip/overview/summary');
} catch (e) {}
})()
""")
page.wait_for_timeout(2000)
# Check for request ID pattern
req_id_pattern = re.compile(r'req_\d{4}')
has_req_id = any(req_id_pattern.search(m) for m in console_messages)
assert has_req_id, "Console should show request ID like req_0001"
def test_successful_request_shows_checkmark(self, page: Page, app_server: str):
"""Successful requests should show checkmark in console."""
page.goto(f"{app_server}/wip-overview")
console_messages = []
page.on("console", lambda msg: console_messages.append(msg.text))
# Make request to a working endpoint
page.evaluate("""
(async () => {
try {
await MesApi.get('/api/wip/overview/summary');
} catch (e) {}
})()
""")
page.wait_for_timeout(3000)
# Filter for MesApi logs
mesapi_logs = [m for m in console_messages if '[MesApi]' in m]
# The exact checkmark depends on implementation (✓ or similar)
assert len(mesapi_logs) > 0, "Should have MesApi console logs"

View File

@@ -0,0 +1,216 @@
# -*- coding: utf-8 -*-
"""End-to-end tests for realtime equipment status cache.
Tests the full flow from cache sync to API response.
Requires a running server with --run-e2e flag.
"""
import pytest
import requests
@pytest.mark.e2e
class TestEquipmentStatusCacheSync:
"""Test equipment status cache synchronization."""
def test_health_check_includes_equipment_status_cache(self, health_url):
"""Test health check includes equipment_status_cache status."""
response = requests.get(health_url)
assert response.status_code == 200
data = response.json()
# Should have equipment_status_cache in response
assert 'equipment_status_cache' in data
cache_status = data['equipment_status_cache']
# Should have expected fields
assert 'enabled' in cache_status
assert 'loaded' in cache_status
assert 'count' in cache_status
assert 'updated_at' in cache_status
def test_health_check_includes_workcenter_mapping(self, health_url):
"""Test health check includes workcenter_mapping status."""
response = requests.get(health_url)
assert response.status_code == 200
data = response.json()
# Should have workcenter_mapping in response
assert 'workcenter_mapping' in data
wc_status = data['workcenter_mapping']
# Should have expected fields
assert 'loaded' in wc_status
assert 'workcenter_count' in wc_status
assert 'group_count' in wc_status
@pytest.mark.e2e
class TestMergedQueryApi:
"""Test merged resource status API endpoints."""
def test_resource_status_endpoint(self, api_base_url):
"""Test /api/resource/status endpoint."""
url = f"{api_base_url}/resource/status"
response = requests.get(url)
assert response.status_code == 200
data = response.json()
assert data['success'] is True
assert 'data' in data
assert 'count' in data
# If data exists, verify structure
if data['data']:
record = data['data'][0]
# Should have merged fields
assert 'RESOURCEID' in record
assert 'RESOURCENAME' in record
# Should have workcenter mapping fields
assert 'WORKCENTER_GROUP' in record
assert 'WORKCENTER_SHORT' in record
# Should have realtime status fields
assert 'STATUS_CATEGORY' in record
def test_resource_status_with_workcenter_filter(self, api_base_url):
"""Test /api/resource/status with workcenter_groups filter."""
url = f"{api_base_url}/resource/status"
response = requests.get(url, params={'workcenter_groups': '焊接'})
assert response.status_code == 200
data = response.json()
assert data['success'] is True
# All results should be in the specified group
for record in data['data']:
# May be None if mapping not found
if record.get('WORKCENTER_GROUP'):
assert record['WORKCENTER_GROUP'] == '焊接'
def test_resource_status_with_production_filter(self, api_base_url):
"""Test /api/resource/status with is_production filter."""
url = f"{api_base_url}/resource/status"
response = requests.get(url, params={'is_production': 'true'})
assert response.status_code == 200
data = response.json()
assert data['success'] is True
def test_resource_status_with_status_category_filter(self, api_base_url):
"""Test /api/resource/status with status_categories filter."""
url = f"{api_base_url}/resource/status"
response = requests.get(url, params={'status_categories': 'PRODUCTIVE,DOWN'})
assert response.status_code == 200
data = response.json()
assert data['success'] is True
# All results should be in specified categories
for record in data['data']:
if record.get('STATUS_CATEGORY'):
assert record['STATUS_CATEGORY'] in ['PRODUCTIVE', 'DOWN']
def test_resource_status_summary_endpoint(self, api_base_url):
"""Test /api/resource/status/summary endpoint."""
url = f"{api_base_url}/resource/status/summary"
response = requests.get(url)
assert response.status_code == 200
data = response.json()
assert data['success'] is True
assert 'data' in data
summary = data['data']
assert 'total_count' in summary
assert 'by_status_category' in summary
assert 'by_workcenter_group' in summary
assert 'with_active_job' in summary
assert 'with_wip' in summary
def test_resource_status_matrix_endpoint(self, api_base_url):
"""Test /api/resource/status/matrix endpoint."""
url = f"{api_base_url}/resource/status/matrix"
response = requests.get(url)
assert response.status_code == 200
data = response.json()
assert data['success'] is True
assert 'data' in data
# If data exists, verify structure
if data['data']:
row = data['data'][0]
assert 'workcenter_group' in row
assert 'workcenter_sequence' in row
assert 'total' in row
# Should have standard status columns
assert 'PRD' in row
assert 'SBY' in row
assert 'UDT' in row
assert 'SDT' in row
assert 'EGT' in row
assert 'NST' in row
assert 'OTHER' in row
@pytest.mark.e2e
class TestFilterOptionsIncludeNewFields:
"""Test filter options API includes new fields."""
def test_status_options_endpoint(self, api_base_url):
"""Test /api/resource/status/options endpoint."""
url = f"{api_base_url}/resource/status/options"
response = requests.get(url)
assert response.status_code == 200
data = response.json()
assert data['success'] is True
assert 'data' in data
options = data['data']
# Should have workcenter_groups
assert 'workcenter_groups' in options
assert isinstance(options['workcenter_groups'], list)
# Should have status_categories
assert 'status_categories' in options
assert isinstance(options['status_categories'], list)
@pytest.mark.e2e
@pytest.mark.redis
class TestCacheIntegration:
"""Test cache integration (requires Redis)."""
def test_cache_data_consistency(self, api_base_url, health_url):
"""Test cache data is consistent between health and API."""
# Get health status
health_resp = requests.get(health_url)
health_data = health_resp.json()
cache_status = health_data.get('equipment_status_cache', {})
if not cache_status.get('enabled') or not cache_status.get('loaded'):
pytest.skip("Equipment status cache not enabled or loaded")
cache_count = cache_status.get('count', 0)
# Get all equipment status via API
api_resp = requests.get(f"{api_base_url}/resource/status")
api_data = api_resp.json()
# Count should be consistent (within reasonable margin for filtering)
api_count = api_data.get('count', 0)
# API may have filters applied from resource-cache, so it could be less
# but should never exceed cache count
assert api_count <= cache_count or cache_count == 0

View File

@@ -0,0 +1,250 @@
# -*- coding: utf-8 -*-
"""End-to-end tests for Resource Cache functionality.
These tests require a running server with Redis enabled.
Run with: pytest tests/e2e/test_resource_cache_e2e.py -v --run-e2e
"""
import pytest
import requests
@pytest.mark.e2e
class TestHealthEndpointResourceCacheE2E:
"""E2E tests for /health endpoint resource cache status."""
def test_health_includes_resource_cache(self, health_url):
"""Test health endpoint includes resource_cache field."""
response = requests.get(health_url, timeout=10)
assert response.status_code in [200, 503]
data = response.json()
assert 'resource_cache' in data
def test_resource_cache_has_required_fields(self, health_url):
"""Test resource_cache has all required fields."""
response = requests.get(health_url, timeout=10)
data = response.json()
rc = data['resource_cache']
assert 'enabled' in rc
if rc['enabled']:
assert 'loaded' in rc
assert 'count' in rc
assert 'version' in rc
assert 'updated_at' in rc
def test_resource_cache_loaded_has_positive_count(self, health_url):
"""Test resource cache has positive count when loaded."""
response = requests.get(health_url, timeout=10)
data = response.json()
rc = data['resource_cache']
if rc.get('enabled') and rc.get('loaded'):
assert rc['count'] > 0, "Resource cache should have data when loaded"
@pytest.mark.e2e
@pytest.mark.redis
class TestResourceHistoryOptionsE2E:
"""E2E tests for resource history filter options endpoint."""
def test_options_endpoint_accessible(self, api_base_url):
"""Test resource history options endpoint is accessible."""
response = requests.get(
f"{api_base_url}/resource/history/options",
timeout=30
)
assert response.status_code == 200
def test_options_returns_families(self, api_base_url):
"""Test options endpoint returns families list."""
response = requests.get(
f"{api_base_url}/resource/history/options",
timeout=30
)
assert response.status_code == 200
data = response.json()
if data.get('success'):
options = data.get('data', {})
assert 'families' in options
assert isinstance(options['families'], list)
def test_options_returns_workcenter_groups(self, api_base_url):
"""Test options endpoint returns workcenter groups."""
response = requests.get(
f"{api_base_url}/resource/history/options",
timeout=30
)
assert response.status_code == 200
data = response.json()
if data.get('success'):
options = data.get('data', {})
assert 'workcenter_groups' in options
assert isinstance(options['workcenter_groups'], list)
@pytest.mark.e2e
@pytest.mark.redis
class TestResourceFilterOptionsE2E:
"""E2E tests for resource filter options endpoint."""
def test_filter_options_endpoint_accessible(self, api_base_url):
"""Test resource filter options endpoint is accessible."""
response = requests.get(
f"{api_base_url}/resource/filter_options",
timeout=30
)
assert response.status_code == 200
def test_filter_options_returns_workcenters(self, api_base_url):
"""Test filter options returns workcenters list."""
response = requests.get(
f"{api_base_url}/resource/filter_options",
timeout=30
)
assert response.status_code == 200
data = response.json()
if data.get('success'):
options = data.get('data', {})
assert 'workcenters' in options
assert isinstance(options['workcenters'], list)
def test_filter_options_returns_families(self, api_base_url):
"""Test filter options returns families list."""
response = requests.get(
f"{api_base_url}/resource/filter_options",
timeout=30
)
assert response.status_code == 200
data = response.json()
if data.get('success'):
options = data.get('data', {})
assert 'families' in options
assert isinstance(options['families'], list)
def test_filter_options_returns_departments(self, api_base_url):
"""Test filter options returns departments list."""
response = requests.get(
f"{api_base_url}/resource/filter_options",
timeout=30
)
assert response.status_code == 200
data = response.json()
if data.get('success'):
options = data.get('data', {})
assert 'departments' in options
assert isinstance(options['departments'], list)
def test_filter_options_returns_statuses(self, api_base_url):
"""Test filter options returns statuses list (from Oracle)."""
response = requests.get(
f"{api_base_url}/resource/filter_options",
timeout=30
)
assert response.status_code == 200
data = response.json()
if data.get('success'):
options = data.get('data', {})
assert 'statuses' in options
assert isinstance(options['statuses'], list)
@pytest.mark.e2e
@pytest.mark.redis
class TestResourceCachePerformanceE2E:
"""E2E tests for resource cache performance."""
def test_filter_options_response_time(self, api_base_url):
"""Test filter options responds within acceptable time."""
import time
# First request may trigger cache load
requests.get(f"{api_base_url}/resource/filter_options", timeout=30)
# Second request should be from cache
start = time.time()
response = requests.get(f"{api_base_url}/resource/filter_options", timeout=30)
elapsed = time.time() - start
assert response.status_code == 200
# Note: statuses still queries Oracle, so allow more time
# Other fields (workcenters, families, departments) come from Redis cache
assert elapsed < 30.0, f"Response took {elapsed:.2f}s, expected < 30s"
def test_history_options_response_time(self, api_base_url):
"""Test history options responds within acceptable time."""
import time
# First request
requests.get(f"{api_base_url}/resource/history/options", timeout=30)
# Second request should be from cache
start = time.time()
response = requests.get(f"{api_base_url}/resource/history/options", timeout=30)
elapsed = time.time() - start
assert response.status_code == 200
# Should be fast (< 2 seconds)
assert elapsed < 2.0, f"Response took {elapsed:.2f}s, expected < 2s"
@pytest.mark.e2e
@pytest.mark.redis
class TestResourceCacheDataConsistencyE2E:
"""E2E tests for resource cache data consistency."""
def test_cache_count_matches_health_report(self, health_url, api_base_url):
"""Test cache count in health matches actual data count."""
# Get health status
health_resp = requests.get(health_url, timeout=10)
health_data = health_resp.json()
rc = health_data.get('resource_cache', {})
if not rc.get('enabled') or not rc.get('loaded'):
pytest.skip("Resource cache not enabled or loaded")
reported_count = rc.get('count', 0)
# Get filter options which uses cached data
options_resp = requests.get(f"{api_base_url}/resource/filter_options", timeout=30)
options_data = options_resp.json()
# The workcenters list should be derived from the same cache
if options_data.get('success'):
workcenters = options_data.get('data', {}).get('workcenters', [])
# Just verify we got data - exact count comparison is complex
assert len(workcenters) > 0 or reported_count == 0
def test_families_consistent_across_endpoints(self, api_base_url):
"""Test families list is consistent across endpoints."""
# Get from resource filter options
filter_resp = requests.get(f"{api_base_url}/resource/filter_options", timeout=30)
filter_data = filter_resp.json()
# Get from resource history options
history_resp = requests.get(f"{api_base_url}/resource/history/options", timeout=30)
history_data = history_resp.json()
if filter_data.get('success') and history_data.get('success'):
filter_families = set(filter_data.get('data', {}).get('families', []))
history_families = set(history_data.get('data', {}).get('families', []))
# Both should return the same families (from same cache)
assert filter_families == history_families, \
f"Families mismatch: filter has {len(filter_families)}, history has {len(history_families)}"

View File

@@ -0,0 +1,319 @@
# -*- coding: utf-8 -*-
"""End-to-end tests for resource history analysis page.
These tests simulate real user workflows through the resource history analysis feature.
Run with: pytest tests/e2e/test_resource_history_e2e.py -v --run-integration
"""
import json
import pytest
from unittest.mock import patch, MagicMock
import pandas as pd
from datetime import datetime, timedelta
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', 'src'))
import mes_dashboard.core.database as db
from mes_dashboard.app import create_app
@pytest.fixture
def app():
"""Create application for testing."""
db._ENGINE = None
app = create_app('testing')
app.config['TESTING'] = True
return app
@pytest.fixture
def client(app):
"""Create test client."""
return app.test_client()
class TestResourceHistoryPageAccess:
"""E2E tests for page access and navigation."""
def test_page_loads_successfully(self, client):
"""Resource history page should load without errors."""
response = client.get('/resource-history')
assert response.status_code == 200
content = response.data.decode('utf-8')
assert '設備歷史績效' in content
def test_page_contains_filter_elements(self, client):
"""Page should contain all filter elements."""
response = client.get('/resource-history')
content = response.data.decode('utf-8')
# Check for filter elements
assert 'startDate' in content
assert 'endDate' in content
# Multi-select dropdowns
assert 'workcenterGroupsDropdown' in content
assert 'familiesDropdown' in content
assert 'isProduction' in content
assert 'isKey' in content
assert 'isMonitor' in content
def test_page_contains_kpi_cards(self, client):
"""Page should contain KPI card elements."""
response = client.get('/resource-history')
content = response.data.decode('utf-8')
assert 'kpiOuPct' in content
assert 'kpiAvailabilityPct' in content
assert 'kpiPrdHours' in content
assert 'kpiUdtHours' in content
assert 'kpiSdtHours' in content
assert 'kpiEgtHours' in content
assert 'kpiMachineCount' in content
def test_page_contains_chart_containers(self, client):
"""Page should contain chart container elements."""
response = client.get('/resource-history')
content = response.data.decode('utf-8')
assert 'trendChart' in content
assert 'stackedChart' in content
assert 'comparisonChart' in content
assert 'heatmapChart' in content
def test_page_contains_table_elements(self, client):
"""Page should contain table elements."""
response = client.get('/resource-history')
content = response.data.decode('utf-8')
assert 'detailTableBody' in content
assert 'expandAllBtn' in content
assert 'collapseAllBtn' in content
assert 'exportBtn' in content
class TestResourceHistoryAPIWorkflow:
"""E2E tests for API workflows."""
@patch('mes_dashboard.services.filter_cache.get_workcenter_groups')
@patch('mes_dashboard.services.filter_cache.get_resource_families')
def test_filter_options_workflow(self, mock_families, mock_groups, client):
"""Filter options should be loadable."""
mock_groups.return_value = [
{'name': '焊接_DB', 'sequence': 1},
{'name': '焊接_WB', 'sequence': 2},
{'name': '成型', 'sequence': 4},
]
mock_families.return_value = ['FAM001', 'FAM002']
response = client.get('/api/resource/history/options')
assert response.status_code == 200
data = json.loads(response.data)
assert data['success'] is True
assert 'workcenter_groups' in data['data']
assert 'families' in data['data']
@patch('mes_dashboard.services.resource_history_service.read_sql_df')
def test_complete_query_workflow(self, mock_read_sql, client):
"""Complete query workflow should return all data sections."""
# Mock responses for the 4 queries in query_summary
kpi_df = pd.DataFrame([{
'PRD_HOURS': 8000, 'SBY_HOURS': 1000, 'UDT_HOURS': 500,
'SDT_HOURS': 300, 'EGT_HOURS': 200, 'NST_HOURS': 1000,
'MACHINE_COUNT': 100
}])
trend_df = pd.DataFrame([
{'DATA_DATE': datetime(2024, 1, 1), 'PRD_HOURS': 1000, 'SBY_HOURS': 100,
'UDT_HOURS': 50, 'SDT_HOURS': 30, 'EGT_HOURS': 20, 'NST_HOURS': 100, 'MACHINE_COUNT': 100},
{'DATA_DATE': datetime(2024, 1, 2), 'PRD_HOURS': 1100, 'SBY_HOURS': 90,
'UDT_HOURS': 40, 'SDT_HOURS': 25, 'EGT_HOURS': 15, 'NST_HOURS': 100, 'MACHINE_COUNT': 100},
])
heatmap_df = pd.DataFrame([
{'WORKCENTERNAME': '焊接_DB', 'DATA_DATE': datetime(2024, 1, 1),
'PRD_HOURS': 400, 'SBY_HOURS': 50, 'UDT_HOURS': 25, 'SDT_HOURS': 15, 'EGT_HOURS': 10},
{'WORKCENTERNAME': '成型', 'DATA_DATE': datetime(2024, 1, 1),
'PRD_HOURS': 600, 'SBY_HOURS': 50, 'UDT_HOURS': 25, 'SDT_HOURS': 15, 'EGT_HOURS': 10},
])
comparison_df = pd.DataFrame([
{'WORKCENTERNAME': '焊接_DB', 'PRD_HOURS': 4000, 'SBY_HOURS': 500,
'UDT_HOURS': 250, 'SDT_HOURS': 150, 'EGT_HOURS': 100, 'MACHINE_COUNT': 50},
{'WORKCENTERNAME': '成型', 'PRD_HOURS': 4000, 'SBY_HOURS': 500,
'UDT_HOURS': 250, 'SDT_HOURS': 150, 'EGT_HOURS': 100, 'MACHINE_COUNT': 50},
])
# Use function-based side_effect for ThreadPoolExecutor parallel queries
def mock_sql(sql):
sql_upper = sql.upper()
if 'DATA_DATE' in sql_upper and 'WORKCENTERNAME' in sql_upper:
return heatmap_df
elif 'DATA_DATE' in sql_upper:
return trend_df
elif 'WORKCENTERNAME' in sql_upper:
return comparison_df
else:
return kpi_df
mock_read_sql.side_effect = mock_sql
response = client.get(
'/api/resource/history/summary'
'?start_date=2024-01-01'
'&end_date=2024-01-07'
'&granularity=day'
)
assert response.status_code == 200
data = json.loads(response.data)
assert data['success'] is True
# Verify KPI
assert data['data']['kpi']['ou_pct'] == 80.0
# Availability% = (8000+1000+200) / (8000+1000+200+300+500+1000) * 100 = 9200/11000 = 83.6%
assert data['data']['kpi']['availability_pct'] == 83.6
assert data['data']['kpi']['machine_count'] == 100
# Verify trend
assert len(data['data']['trend']) == 2
# Trend should also have availability_pct
assert 'availability_pct' in data['data']['trend'][0]
# Verify heatmap
assert len(data['data']['heatmap']) == 2
# Verify comparison
assert len(data['data']['workcenter_comparison']) == 2
@patch('mes_dashboard.services.resource_history_service.read_sql_df')
def test_detail_query_workflow(self, mock_read_sql, client):
"""Detail query workflow should return hierarchical data."""
detail_df = pd.DataFrame([
{'WORKCENTERNAME': '焊接_DB', 'RESOURCEFAMILYNAME': 'FAM001', 'RESOURCENAME': 'RES001',
'PRD_HOURS': 80, 'SBY_HOURS': 10, 'UDT_HOURS': 5, 'SDT_HOURS': 3, 'EGT_HOURS': 2,
'NST_HOURS': 10, 'TOTAL_HOURS': 110},
{'WORKCENTERNAME': '焊接_DB', 'RESOURCEFAMILYNAME': 'FAM001', 'RESOURCENAME': 'RES002',
'PRD_HOURS': 75, 'SBY_HOURS': 15, 'UDT_HOURS': 5, 'SDT_HOURS': 3, 'EGT_HOURS': 2,
'NST_HOURS': 10, 'TOTAL_HOURS': 110},
])
mock_read_sql.return_value = detail_df
response = client.get(
'/api/resource/history/detail'
'?start_date=2024-01-01'
'&end_date=2024-01-07'
)
assert response.status_code == 200
data = json.loads(response.data)
assert data['success'] is True
assert data['total'] == 2
assert len(data['data']) == 2
assert data['truncated'] is False
# Verify data structure
first_row = data['data'][0]
assert 'workcenter' in first_row
assert 'family' in first_row
assert 'resource' in first_row
assert 'ou_pct' in first_row
assert 'availability_pct' in first_row
assert 'prd_hours' in first_row
assert 'prd_pct' in first_row
@patch('mes_dashboard.services.resource_history_service.read_sql_df')
def test_export_workflow(self, mock_read_sql, client):
"""Export workflow should return valid CSV."""
mock_read_sql.return_value = pd.DataFrame([
{'WORKCENTERNAME': '焊接_DB', 'RESOURCEFAMILYNAME': 'FAM001', 'RESOURCENAME': 'RES001',
'PRD_HOURS': 80, 'SBY_HOURS': 10, 'UDT_HOURS': 5, 'SDT_HOURS': 3, 'EGT_HOURS': 2,
'NST_HOURS': 10, 'TOTAL_HOURS': 110},
])
response = client.get(
'/api/resource/history/export'
'?start_date=2024-01-01'
'&end_date=2024-01-07'
)
assert response.status_code == 200
assert 'text/csv' in response.content_type
content = response.data.decode('utf-8-sig')
lines = content.strip().split('\n')
# Should have header + data rows
assert len(lines) >= 2
# Verify header
header = lines[0]
assert '站點' in header
assert 'OU%' in header
assert 'Availability%' in header
class TestResourceHistoryValidation:
"""E2E tests for input validation."""
def test_date_range_validation(self, client):
"""Date range exceeding 730 days should be rejected."""
response = client.get(
'/api/resource/history/summary'
'?start_date=2024-01-01'
'&end_date=2026-01-02'
)
assert response.status_code == 400
data = json.loads(response.data)
assert data['success'] is False
assert '730' in data['error']
def test_missing_required_params(self, client):
"""Missing required parameters should return error."""
response = client.get('/api/resource/history/summary')
assert response.status_code == 400
data = json.loads(response.data)
assert data['success'] is False
@patch('mes_dashboard.services.resource_history_service.read_sql_df')
def test_granularity_options(self, mock_read_sql, client):
"""Different granularity options should work."""
mock_df = pd.DataFrame([{
'PRD_HOURS': 100, 'SBY_HOURS': 10, 'UDT_HOURS': 5,
'SDT_HOURS': 3, 'EGT_HOURS': 2, 'NST_HOURS': 10, 'MACHINE_COUNT': 5
}])
mock_read_sql.return_value = mock_df
for granularity in ['day', 'week', 'month', 'year']:
mock_read_sql.side_effect = [mock_df, pd.DataFrame(), pd.DataFrame(), pd.DataFrame()]
response = client.get(
f'/api/resource/history/summary'
f'?start_date=2024-01-01'
f'&end_date=2024-01-31'
f'&granularity={granularity}'
)
assert response.status_code == 200, f"Failed for granularity={granularity}"
class TestResourceHistoryNavigation:
"""E2E tests for navigation integration."""
def test_portal_includes_history_tab(self, client):
"""Portal should include resource history tab."""
response = client.get('/')
content = response.data.decode('utf-8')
assert '設備歷史績效' in content
assert 'resourceHistoryFrame' in content
if __name__ == '__main__':
pytest.main([__file__, '-v'])