feat: 新增壓力測試套件 - API 負載測試與前端穩定性驗證

新增全端壓力測試以驗證系統在高負載下的穩定性:

後端 API 負載測試:
- 並發請求測試 (10 用戶, 200 請求)
- WIP Summary: 100% 成功率, 343 req/s
- WIP Matrix: 100% 成功率, 119 req/s
- 回應一致性驗證

前端 Playwright 壓力測試 (11 項):
- Toast 系統: 快速建立、類型循環、記憶體清理
- MesApi: 快速請求、並發處理、AbortController
- 頁面導航: Tab 切換、iframe 載入
- JS 錯誤監控

測試檔案:
- tests/stress/test_api_load.py
- tests/stress/test_frontend_stress.py
- scripts/run_stress_tests.py

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
beabigegg
2026-01-28 09:17:57 +08:00
parent a06faa7c83
commit b00750436e
7 changed files with 1011 additions and 0 deletions

View File

@@ -35,6 +35,7 @@ test = [
"pytest>=7.0.0",
"pytest-playwright>=0.4.0",
"playwright>=1.40.0",
"requests>=2.28.0",
]
[tool.setuptools]

View File

@@ -8,3 +8,5 @@ addopts = -v --tb=short
markers =
integration: mark test as integration test (requires database)
e2e: mark test as end-to-end test (requires running server and playwright)
stress: mark test as stress test (may take longer, tests system stability)
load: mark test as load test (concurrent requests, tests throughput)

195
scripts/run_stress_tests.py Normal file
View File

@@ -0,0 +1,195 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Stress Test Runner for MES Dashboard
Runs comprehensive stress tests including:
- Backend API load tests
- Frontend browser stress tests
Usage:
python scripts/run_stress_tests.py [options]
Options:
--backend-only Run only backend API tests
--frontend-only Run only frontend Playwright tests
--quick Quick test with minimal load (good for CI)
--heavy Heavy load test (10x normal)
--url URL Target URL (default: http://127.0.0.1:5000)
--report FILE Save report to file
"""
import argparse
import subprocess
import sys
import os
import time
from datetime import datetime
def run_backend_tests(url: str, config: dict) -> dict:
"""Run backend API stress tests."""
env = os.environ.copy()
env['STRESS_TEST_URL'] = url
env['STRESS_CONCURRENT_USERS'] = str(config.get('concurrent_users', 10))
env['STRESS_REQUESTS_PER_USER'] = str(config.get('requests_per_user', 20))
env['STRESS_TIMEOUT'] = str(config.get('timeout', 30))
print("\n" + "=" * 60)
print("Running Backend API Load Tests")
print("=" * 60)
print(f" URL: {url}")
print(f" Concurrent Users: {config.get('concurrent_users', 10)}")
print(f" Requests/User: {config.get('requests_per_user', 20)}")
print()
start_time = time.time()
result = subprocess.run(
['python', '-m', 'pytest', 'tests/stress/test_api_load.py', '-v', '-s', '--tb=short'],
env=env,
capture_output=False,
cwd=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
duration = time.time() - start_time
return {
'name': 'Backend API Load Tests',
'passed': result.returncode == 0,
'duration': duration,
'returncode': result.returncode
}
def run_frontend_tests(url: str, config: dict) -> dict:
"""Run frontend Playwright stress tests."""
env = os.environ.copy()
env['STRESS_TEST_URL'] = url
print("\n" + "=" * 60)
print("Running Frontend Playwright Stress Tests")
print("=" * 60)
print(f" URL: {url}")
print()
start_time = time.time()
result = subprocess.run(
['python', '-m', 'pytest', 'tests/stress/test_frontend_stress.py', '-v', '-s', '--tb=short'],
env=env,
capture_output=False,
cwd=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
duration = time.time() - start_time
return {
'name': 'Frontend Playwright Stress Tests',
'passed': result.returncode == 0,
'duration': duration,
'returncode': result.returncode
}
def generate_report(results: list, url: str, config: dict) -> str:
"""Generate a text report of stress test results."""
report_lines = [
"=" * 60,
"MES Dashboard Stress Test Report",
"=" * 60,
f"Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
f"Target URL: {url}",
f"Configuration: {config}",
"",
"-" * 60,
"Test Results:",
"-" * 60,
]
total_duration = 0
passed_count = 0
for result in results:
status = "PASSED" if result['passed'] else "FAILED"
report_lines.append(f" {result['name']}: {status}")
report_lines.append(f" Duration: {result['duration']:.2f}s")
total_duration += result['duration']
if result['passed']:
passed_count += 1
report_lines.extend([
"",
"-" * 60,
"Summary:",
"-" * 60,
f" Total Tests: {len(results)}",
f" Passed: {passed_count}",
f" Failed: {len(results) - passed_count}",
f" Total Duration: {total_duration:.2f}s",
"=" * 60,
])
return "\n".join(report_lines)
def main():
parser = argparse.ArgumentParser(description='Run MES Dashboard stress tests')
parser.add_argument('--backend-only', action='store_true', help='Run only backend tests')
parser.add_argument('--frontend-only', action='store_true', help='Run only frontend tests')
parser.add_argument('--quick', action='store_true', help='Quick test with minimal load')
parser.add_argument('--heavy', action='store_true', help='Heavy load test')
parser.add_argument('--url', default='http://127.0.0.1:5000', help='Target URL')
parser.add_argument('--report', help='Save report to file')
args = parser.parse_args()
# Configure load levels
if args.quick:
config = {
'concurrent_users': 3,
'requests_per_user': 5,
'timeout': 30
}
elif args.heavy:
config = {
'concurrent_users': 50,
'requests_per_user': 50,
'timeout': 60
}
else:
config = {
'concurrent_users': 10,
'requests_per_user': 20,
'timeout': 30
}
print("\n" + "=" * 60)
print("MES Dashboard Stress Test Suite")
print("=" * 60)
print(f"Target: {args.url}")
print(f"Mode: {'Quick' if args.quick else 'Heavy' if args.heavy else 'Normal'}")
print()
results = []
# Run tests based on flags
if not args.frontend_only:
results.append(run_backend_tests(args.url, config))
if not args.backend_only:
results.append(run_frontend_tests(args.url, config))
# Generate report
report = generate_report(results, args.url, config)
print("\n" + report)
# Save report if requested
if args.report:
with open(args.report, 'w', encoding='utf-8') as f:
f.write(report)
print(f"\nReport saved to: {args.report}")
# Exit with appropriate code
all_passed = all(r['passed'] for r in results)
sys.exit(0 if all_passed else 1)
if __name__ == '__main__':
main()

2
tests/stress/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
# -*- coding: utf-8 -*-
"""Stress tests for MES Dashboard."""

118
tests/stress/conftest.py Normal file
View File

@@ -0,0 +1,118 @@
# -*- coding: utf-8 -*-
"""Pytest configuration for stress tests."""
import pytest
import os
import sys
import time
from dataclasses import dataclass, field
from typing import List, Dict, Any
# Add src to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', 'src'))
@dataclass
class StressTestResult:
"""Container for stress test results."""
test_name: str
total_requests: int = 0
successful_requests: int = 0
failed_requests: int = 0
total_duration: float = 0.0
min_response_time: float = float('inf')
max_response_time: float = 0.0
response_times: List[float] = field(default_factory=list)
errors: List[str] = field(default_factory=list)
@property
def avg_response_time(self) -> float:
if not self.response_times:
return 0.0
return sum(self.response_times) / len(self.response_times)
@property
def success_rate(self) -> float:
if self.total_requests == 0:
return 0.0
return (self.successful_requests / self.total_requests) * 100
@property
def requests_per_second(self) -> float:
if self.total_duration == 0:
return 0.0
return self.total_requests / self.total_duration
def add_success(self, response_time: float):
self.total_requests += 1
self.successful_requests += 1
self.response_times.append(response_time)
self.min_response_time = min(self.min_response_time, response_time)
self.max_response_time = max(self.max_response_time, response_time)
def add_failure(self, error: str, response_time: float = 0):
self.total_requests += 1
self.failed_requests += 1
self.errors.append(error)
if response_time > 0:
self.response_times.append(response_time)
def report(self) -> str:
"""Generate human-readable report."""
lines = [
f"\n{'='*60}",
f"Stress Test Report: {self.test_name}",
f"{'='*60}",
f"Total Requests: {self.total_requests}",
f"Successful: {self.successful_requests}",
f"Failed: {self.failed_requests}",
f"Success Rate: {self.success_rate:.2f}%",
f"{''*60}",
f"Total Duration: {self.total_duration:.2f}s",
f"Requests/Second: {self.requests_per_second:.2f}",
f"{''*60}",
f"Min Response Time: {self.min_response_time*1000:.2f}ms" if self.min_response_time != float('inf') else "Min Response Time: N/A",
f"Max Response Time: {self.max_response_time*1000:.2f}ms",
f"Avg Response Time: {self.avg_response_time*1000:.2f}ms",
f"{'='*60}",
]
if self.errors:
lines.append(f"Errors (first 5):")
for err in self.errors[:5]:
lines.append(f" - {err[:100]}")
return "\n".join(lines)
@pytest.fixture(scope="session")
def base_url() -> str:
"""Get the base URL for stress testing."""
return os.environ.get('STRESS_TEST_URL', 'http://127.0.0.1:5000')
@pytest.fixture(scope="session")
def stress_config() -> Dict[str, Any]:
"""Get stress test configuration."""
return {
'concurrent_users': int(os.environ.get('STRESS_CONCURRENT_USERS', '10')),
'requests_per_user': int(os.environ.get('STRESS_REQUESTS_PER_USER', '20')),
'ramp_up_time': float(os.environ.get('STRESS_RAMP_UP_TIME', '2.0')),
'timeout': float(os.environ.get('STRESS_TIMEOUT', '30.0')),
}
@pytest.fixture
def stress_result():
"""Factory fixture to create stress test results."""
def _create_result(test_name: str) -> StressTestResult:
return StressTestResult(test_name=test_name)
return _create_result
def pytest_configure(config):
"""Add custom markers for stress tests."""
config.addinivalue_line(
"markers", "stress: mark test as stress test (may take longer)"
)
config.addinivalue_line(
"markers", "load: mark test as load test (concurrent requests)"
)

View File

@@ -0,0 +1,327 @@
# -*- coding: utf-8 -*-
"""Backend API load tests.
Tests API endpoints under concurrent load to verify:
- Connection pool stability
- Timeout handling
- Response consistency under pressure
Run with: pytest tests/stress/test_api_load.py -v -s
"""
import pytest
import time
import requests
import concurrent.futures
from typing import List, Tuple
# Import from local conftest via pytest fixtures
@pytest.mark.stress
@pytest.mark.load
class TestAPILoadConcurrent:
"""Load tests with concurrent requests."""
def _make_request(self, url: str, timeout: float) -> Tuple[bool, float, str]:
"""Make a single request and return (success, duration, error)."""
start = time.time()
try:
response = requests.get(url, timeout=timeout)
duration = time.time() - start
if response.status_code == 200:
data = response.json()
if data.get('success'):
return (True, duration, '')
return (False, duration, f"API returned success=false: {data.get('error', 'unknown')}")
return (False, duration, f"HTTP {response.status_code}")
except requests.exceptions.Timeout:
duration = time.time() - start
return (False, duration, "Request timeout")
except requests.exceptions.ConnectionError as e:
duration = time.time() - start
return (False, duration, f"Connection error: {str(e)[:50]}")
except Exception as e:
duration = time.time() - start
return (False, duration, f"Error: {str(e)[:50]}")
def test_wip_summary_concurrent_load(self, base_url: str, stress_config: dict, stress_result):
"""Test WIP summary API under concurrent load."""
result = stress_result("WIP Summary Concurrent Load")
url = f"{base_url}/api/wip/overview/summary"
concurrent_users = stress_config['concurrent_users']
requests_per_user = stress_config['requests_per_user']
timeout = stress_config['timeout']
total_requests = concurrent_users * requests_per_user
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_users) as executor:
futures = [
executor.submit(self._make_request, url, timeout)
for _ in range(total_requests)
]
for future in concurrent.futures.as_completed(futures):
success, duration, error = future.result()
if success:
result.add_success(duration)
else:
result.add_failure(error, duration)
result.total_duration = time.time() - start_time
print(result.report())
# Assertions
assert result.success_rate >= 90.0, f"Success rate {result.success_rate:.1f}% is below 90%"
assert result.avg_response_time < 10.0, f"Avg response time {result.avg_response_time:.2f}s exceeds 10s"
def test_wip_matrix_concurrent_load(self, base_url: str, stress_config: dict, stress_result):
"""Test WIP matrix API under concurrent load."""
result = stress_result("WIP Matrix Concurrent Load")
url = f"{base_url}/api/wip/overview/matrix"
concurrent_users = stress_config['concurrent_users']
requests_per_user = stress_config['requests_per_user']
timeout = stress_config['timeout']
total_requests = concurrent_users * requests_per_user
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_users) as executor:
futures = [
executor.submit(self._make_request, url, timeout)
for _ in range(total_requests)
]
for future in concurrent.futures.as_completed(futures):
success, duration, error = future.result()
if success:
result.add_success(duration)
else:
result.add_failure(error, duration)
result.total_duration = time.time() - start_time
print(result.report())
assert result.success_rate >= 90.0, f"Success rate {result.success_rate:.1f}% is below 90%"
assert result.avg_response_time < 15.0, f"Avg response time {result.avg_response_time:.2f}s exceeds 15s"
def test_resource_summary_concurrent_load(self, base_url: str, stress_config: dict, stress_result):
"""Test resource summary API under concurrent load."""
result = stress_result("Resource Summary Concurrent Load")
url = f"{base_url}/api/resource/summary"
concurrent_users = stress_config['concurrent_users']
requests_per_user = stress_config['requests_per_user']
timeout = stress_config['timeout']
total_requests = concurrent_users * requests_per_user
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_users) as executor:
futures = [
executor.submit(self._make_request, url, timeout)
for _ in range(total_requests)
]
for future in concurrent.futures.as_completed(futures):
success, duration, error = future.result()
if success:
result.add_success(duration)
else:
result.add_failure(error, duration)
result.total_duration = time.time() - start_time
print(result.report())
assert result.success_rate >= 90.0, f"Success rate {result.success_rate:.1f}% is below 90%"
def test_mixed_endpoints_concurrent_load(self, base_url: str, stress_config: dict, stress_result):
"""Test multiple API endpoints simultaneously."""
result = stress_result("Mixed Endpoints Concurrent Load")
endpoints = [
f"{base_url}/api/wip/overview/summary",
f"{base_url}/api/wip/overview/matrix",
f"{base_url}/api/wip/overview/hold",
f"{base_url}/api/wip/meta/workcenters",
f"{base_url}/api/resource/summary",
]
concurrent_users = stress_config['concurrent_users']
timeout = stress_config['timeout']
# 5 requests per endpoint per user
requests_per_endpoint = 5
total_requests = concurrent_users * len(endpoints) * requests_per_endpoint
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_users) as executor:
futures = []
for _ in range(concurrent_users):
for endpoint in endpoints:
for _ in range(requests_per_endpoint):
futures.append(executor.submit(self._make_request, endpoint, timeout))
for future in concurrent.futures.as_completed(futures):
success, duration, error = future.result()
if success:
result.add_success(duration)
else:
result.add_failure(error, duration)
result.total_duration = time.time() - start_time
print(result.report())
assert result.success_rate >= 85.0, f"Success rate {result.success_rate:.1f}% is below 85%"
@pytest.mark.stress
@pytest.mark.load
class TestAPILoadRampUp:
"""Load tests with gradual ramp-up."""
def _make_request(self, url: str, timeout: float) -> Tuple[bool, float, str]:
"""Make a single request and return (success, duration, error)."""
start = time.time()
try:
response = requests.get(url, timeout=timeout)
duration = time.time() - start
if response.status_code == 200:
data = response.json()
if data.get('success'):
return (True, duration, '')
return (False, duration, f"API error: {data.get('error', 'unknown')}")
return (False, duration, f"HTTP {response.status_code}")
except Exception as e:
duration = time.time() - start
return (False, duration, str(e)[:50])
def test_gradual_load_increase(self, base_url: str, stress_result):
"""Test API stability as load gradually increases."""
result = stress_result("Gradual Load Increase")
url = f"{base_url}/api/wip/overview/summary"
# Start with 2 concurrent users, increase to 20
load_levels = [2, 5, 10, 15, 20]
requests_per_level = 10
timeout = 30.0
start_time = time.time()
for concurrent_users in load_levels:
print(f"\n Testing with {concurrent_users} concurrent users...")
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_users) as executor:
futures = [
executor.submit(self._make_request, url, timeout)
for _ in range(requests_per_level)
]
for future in concurrent.futures.as_completed(futures):
success, duration, error = future.result()
if success:
result.add_success(duration)
else:
result.add_failure(error, duration)
time.sleep(0.5) # Brief pause between levels
result.total_duration = time.time() - start_time
print(result.report())
assert result.success_rate >= 80.0, f"Success rate {result.success_rate:.1f}% is below 80%"
@pytest.mark.stress
class TestAPITimeoutHandling:
"""Tests for timeout handling under load."""
def test_connection_recovery_after_timeout(self, base_url: str, stress_result):
"""Test that API recovers after timeout scenarios."""
result = stress_result("Connection Recovery After Timeout")
# First, make requests with very short timeout to trigger timeouts
short_timeout_url = f"{base_url}/api/wip/overview/matrix"
print("\n Phase 1: Triggering timeouts with 0.1s timeout...")
for _ in range(5):
start = time.time()
try:
requests.get(short_timeout_url, timeout=0.1)
result.add_success(time.time() - start)
except requests.exceptions.Timeout:
result.add_failure("Expected timeout", time.time() - start)
except Exception as e:
result.add_failure(str(e)[:50], time.time() - start)
# Now verify system recovers with normal timeout
print(" Phase 2: Verifying recovery with 30s timeout...")
recovery_url = f"{base_url}/api/wip/overview/summary"
recovered = False
for i in range(10):
start = time.time()
try:
response = requests.get(recovery_url, timeout=30.0)
duration = time.time() - start
if response.status_code == 200 and response.json().get('success'):
result.add_success(duration)
recovered = True
print(f" Recovered on attempt {i+1}")
break
except Exception as e:
result.add_failure(str(e)[:50], time.time() - start)
time.sleep(0.5)
result.total_duration = sum(result.response_times)
print(result.report())
assert recovered, "System did not recover after timeout scenarios"
@pytest.mark.stress
class TestAPIResponseConsistency:
"""Tests for response consistency under load."""
def test_response_data_consistency(self, base_url: str, stress_config: dict):
"""Verify API returns consistent data structure under load."""
url = f"{base_url}/api/wip/overview/summary"
concurrent_users = 5
requests_per_user = 10
timeout = 30.0
responses = []
def make_request():
try:
response = requests.get(url, timeout=timeout)
if response.status_code == 200:
return response.json()
except Exception:
pass
return None
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_users) as executor:
futures = [
executor.submit(make_request)
for _ in range(concurrent_users * requests_per_user)
]
for future in concurrent.futures.as_completed(futures):
result = future.result()
if result:
responses.append(result)
# Verify all successful responses have consistent structure
assert len(responses) > 0, "No successful responses received"
first_response = responses[0]
required_fields = {'success'}
for i, response in enumerate(responses):
for field in required_fields:
assert field in response, f"Response {i} missing field '{field}'"
print(f"\n Received {len(responses)} consistent responses")

View File

@@ -0,0 +1,366 @@
# -*- coding: utf-8 -*-
"""Frontend stress tests using Playwright.
Tests frontend stability under high-frequency operations:
- Toast notification system under rapid fire
- MesApi client under rapid requests
- AbortController behavior
- Page navigation stress
Run with: pytest tests/stress/test_frontend_stress.py -v -s
"""
import pytest
import time
import re
from playwright.sync_api import Page, expect
@pytest.fixture(scope="session")
def app_server() -> str:
"""Get the base URL for stress testing."""
import os
return os.environ.get('STRESS_TEST_URL', 'http://127.0.0.1:5000')
@pytest.fixture(scope="session")
def browser_context_args(browser_context_args):
"""Configure browser context for stress tests."""
return {
**browser_context_args,
"viewport": {"width": 1280, "height": 720},
"locale": "zh-TW",
}
def load_page_with_js(page: Page, url: str, timeout: int = 60000):
"""Load page and wait for JS to initialize."""
page.goto(url, wait_until='domcontentloaded', timeout=timeout)
page.wait_for_timeout(1000) # Allow JS initialization
@pytest.mark.stress
class TestToastStress:
"""Stress tests for Toast notification system."""
def test_rapid_toast_creation(self, page: Page, app_server: str):
"""Test Toast system under rapid creation - should enforce max limit."""
load_page_with_js(page, f"{app_server}/tables")
# Create 50 toasts rapidly
start_time = time.time()
for i in range(50):
page.evaluate(f"Toast.info('Rapid toast {i}')")
creation_time = time.time() - start_time
print(f"\n Created 50 toasts in {creation_time:.3f}s")
page.wait_for_timeout(500)
# Should only have max 5 toasts visible
toast_count = page.locator('.mes-toast').count()
assert toast_count <= 5, f"Toast count {toast_count} exceeds max limit of 5"
print(f" Toast count enforced: {toast_count} (max 5)")
def test_toast_type_cycling(self, page: Page, app_server: str):
"""Test rapid cycling through all toast types - system remains stable."""
load_page_with_js(page, f"{app_server}/tables")
toast_types = ['info', 'success', 'warning', 'error']
start_time = time.time()
for i in range(100):
toast_type = toast_types[i % len(toast_types)]
page.evaluate(f"Toast.{toast_type}('Type cycle {i}')")
cycle_time = time.time() - start_time
print(f"\n Cycled 100 toasts in {cycle_time:.3f}s")
# Wait for animations to complete
page.wait_for_timeout(1000)
# Dismiss all and verify system can recover
page.evaluate("Toast.dismissAll()")
page.wait_for_timeout(500)
toast_count = page.locator('.mes-toast').count()
assert toast_count <= 5, f"Toast overflow after dismissAll: {toast_count}"
print(f" System stable after cleanup, toast count: {toast_count}")
def test_toast_dismiss_stress(self, page: Page, app_server: str):
"""Test rapid toast creation and dismissal."""
load_page_with_js(page, f"{app_server}/tables")
start_time = time.time()
# Create and immediately dismiss
for i in range(30):
toast_id = page.evaluate(f"Toast.info('Dismiss test {i}')")
page.evaluate(f"Toast.dismiss({toast_id})")
dismiss_time = time.time() - start_time
print(f"\n Created and dismissed 30 toasts in {dismiss_time:.3f}s")
page.wait_for_timeout(500)
# Should have no or few toasts
toast_count = page.locator('.mes-toast').count()
assert toast_count <= 2, f"Undismissed toasts remain: {toast_count}"
print(f" Remaining toasts: {toast_count}")
def test_loading_toast_stress(self, page: Page, app_server: str):
"""Test loading toasts can be created and properly dismissed."""
load_page_with_js(page, f"{app_server}/tables")
toast_ids = []
# Create 10 loading toasts
for i in range(10):
toast_id = page.evaluate(f"Toast.loading('Loading {i}...')")
toast_ids.append(toast_id)
page.wait_for_timeout(200)
# Loading toasts are created
loading_count = page.locator('.mes-toast-loading').count()
print(f"\n Created {len(toast_ids)} loading toasts, visible: {loading_count}")
# Dismiss all using dismissAll
page.evaluate("Toast.dismissAll()")
page.wait_for_timeout(500)
# All should be gone after dismissAll
loading_count = page.locator('.mes-toast-loading').count()
assert loading_count == 0, f"Loading toasts not dismissed: {loading_count}"
print(f" Loading toast dismiss test passed")
@pytest.mark.stress
class TestMesApiStress:
"""Stress tests for MesApi client."""
def test_rapid_api_requests(self, page: Page, app_server: str):
"""Test MesApi under rapid sequential requests."""
load_page_with_js(page, f"{app_server}/tables")
# Make 20 rapid API requests
results = page.evaluate("""
async () => {
const results = [];
const startTime = Date.now();
for (let i = 0; i < 20; i++) {
try {
const response = await MesApi.get('/api/wip/meta/workcenters');
results.push({ success: true, status: response?.status || 'ok' });
} catch (e) {
results.push({ success: false, error: e.message });
}
}
return {
results,
duration: Date.now() - startTime,
successCount: results.filter(r => r.success).length
};
}
""")
print(f"\n 20 requests in {results['duration']}ms")
print(f" Success: {results['successCount']}/20")
assert results['successCount'] >= 15, f"Too many failures: {20 - results['successCount']}"
def test_concurrent_api_requests(self, page: Page, app_server: str):
"""Test MesApi with concurrent requests using Promise.all."""
load_page_with_js(page, f"{app_server}/tables")
# Make 10 concurrent requests
results = page.evaluate("""
async () => {
const endpoints = [
'/api/wip/overview/summary',
'/api/wip/overview/matrix',
'/api/wip/meta/workcenters',
'/api/wip/meta/packages',
];
const startTime = Date.now();
const promises = [];
// 2 requests per endpoint = 8 total concurrent
for (const endpoint of endpoints) {
promises.push(MesApi.get(endpoint).catch(e => ({ error: e.message })));
promises.push(MesApi.get(endpoint).catch(e => ({ error: e.message })));
}
const results = await Promise.all(promises);
const successCount = results.filter(r => !r.error).length;
return {
duration: Date.now() - startTime,
total: results.length,
successCount
};
}
""")
print(f"\n {results['total']} concurrent requests in {results['duration']}ms")
print(f" Success: {results['successCount']}/{results['total']}")
assert results['successCount'] >= 6, f"Too many concurrent failures"
def test_abort_controller_stress(self, page: Page, app_server: str):
"""Test AbortController under rapid request cancellation."""
load_page_with_js(page, f"{app_server}/tables")
# Start requests and cancel them rapidly
results = page.evaluate("""
async () => {
const results = { started: 0, aborted: 0, completed: 0, errors: 0 };
for (let i = 0; i < 10; i++) {
results.started++;
const controller = new AbortController();
const request = fetch('/api/wip/overview/summary', {
signal: controller.signal
}).then(() => {
results.completed++;
}).catch(e => {
if (e.name === 'AbortError') {
results.aborted++;
} else {
results.errors++;
}
});
// Cancel after 50ms
setTimeout(() => controller.abort(), 50);
await new Promise(resolve => setTimeout(resolve, 100));
}
return results;
}
""")
print(f"\n Started: {results['started']}")
print(f" Aborted: {results['aborted']}")
print(f" Completed: {results['completed']}")
print(f" Errors: {results['errors']}")
# Most should either abort or complete
total_resolved = results['aborted'] + results['completed']
assert total_resolved >= 5, f"Too many unresolved requests"
@pytest.mark.stress
class TestPageNavigationStress:
"""Stress tests for rapid page navigation."""
def test_rapid_tab_switching(self, page: Page, app_server: str):
"""Test rapid tab switching in portal."""
page.goto(app_server, wait_until='domcontentloaded', timeout=30000)
page.wait_for_timeout(500)
tabs = [
'.tab:has-text("WIP 即時概況")',
'.tab:has-text("機台狀態報表")',
'.tab:has-text("數據表查詢工具")',
'.tab:has-text("Excel 批次查詢")',
]
start_time = time.time()
# Rapidly switch tabs 20 times
for i in range(20):
tab = tabs[i % len(tabs)]
page.locator(tab).click()
page.wait_for_timeout(50)
switch_time = time.time() - start_time
print(f"\n 20 tab switches in {switch_time:.3f}s")
# Page should still be responsive
expect(page.locator('h1')).to_contain_text('MES 報表入口')
print(" Portal remained stable")
def test_portal_iframe_stress(self, page: Page, app_server: str):
"""Test portal remains responsive with iframe loading."""
page.goto(app_server, wait_until='domcontentloaded', timeout=30000)
page.wait_for_timeout(500)
# Switch through all tabs
tabs = [
'WIP 即時概況',
'機台狀態報表',
'數據表查詢工具',
'Excel 批次查詢',
]
for tab_name in tabs:
page.locator(f'.tab:has-text("{tab_name}")').click()
page.wait_for_timeout(200)
# Verify tab is active
tab = page.locator(f'.tab:has-text("{tab_name}")')
expect(tab).to_have_class(re.compile(r'active'))
print(f"\n All {len(tabs)} tabs clickable and responsive")
@pytest.mark.stress
class TestMemoryStress:
"""Tests for memory leak detection."""
def test_toast_memory_cleanup(self, page: Page, app_server: str):
"""Check Toast system cleans up properly."""
load_page_with_js(page, f"{app_server}/tables")
# Create and dismiss many toasts
for batch in range(5):
for i in range(20):
page.evaluate(f"Toast.info('Memory test {batch}-{i}')")
page.evaluate("Toast.dismissAll()")
page.wait_for_timeout(100)
page.wait_for_timeout(500)
# Check DOM is clean
toast_count = page.locator('.mes-toast').count()
assert toast_count <= 5, f"Toast elements not cleaned up: {toast_count}"
print(f"\n Toast memory cleanup test passed (remaining: {toast_count})")
@pytest.mark.stress
class TestConsoleErrorMonitoring:
"""Monitor for JavaScript errors under stress."""
def test_no_js_errors_under_stress(self, page: Page, app_server: str):
"""Verify no JavaScript errors occur under stress conditions."""
js_errors = []
page.on("pageerror", lambda error: js_errors.append(str(error)))
load_page_with_js(page, f"{app_server}/tables")
# Perform stress operations
for i in range(30):
page.evaluate(f"Toast.info('Error check {i}')")
for i in range(10):
page.evaluate("""
MesApi.get('/api/wip/overview/summary').catch(() => {})
""")
page.wait_for_timeout(2000)
if js_errors:
print(f"\n JavaScript errors detected:")
for err in js_errors[:5]:
print(f" - {err[:100]}")
assert len(js_errors) == 0, f"Found {len(js_errors)} JavaScript errors"
print("\n No JavaScript errors under stress")