diff --git a/pyproject.toml b/pyproject.toml index 5ce718a..bc9e180 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,6 +35,7 @@ test = [ "pytest>=7.0.0", "pytest-playwright>=0.4.0", "playwright>=1.40.0", + "requests>=2.28.0", ] [tool.setuptools] diff --git a/pytest.ini b/pytest.ini index 4959381..8f06f60 100644 --- a/pytest.ini +++ b/pytest.ini @@ -8,3 +8,5 @@ addopts = -v --tb=short markers = integration: mark test as integration test (requires database) e2e: mark test as end-to-end test (requires running server and playwright) + stress: mark test as stress test (may take longer, tests system stability) + load: mark test as load test (concurrent requests, tests throughput) diff --git a/scripts/run_stress_tests.py b/scripts/run_stress_tests.py new file mode 100644 index 0000000..2c9217c --- /dev/null +++ b/scripts/run_stress_tests.py @@ -0,0 +1,195 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Stress Test Runner for MES Dashboard + +Runs comprehensive stress tests including: +- Backend API load tests +- Frontend browser stress tests + +Usage: + python scripts/run_stress_tests.py [options] + +Options: + --backend-only Run only backend API tests + --frontend-only Run only frontend Playwright tests + --quick Quick test with minimal load (good for CI) + --heavy Heavy load test (10x normal) + --url URL Target URL (default: http://127.0.0.1:5000) + --report FILE Save report to file +""" + +import argparse +import subprocess +import sys +import os +import time +from datetime import datetime + + +def run_backend_tests(url: str, config: dict) -> dict: + """Run backend API stress tests.""" + env = os.environ.copy() + env['STRESS_TEST_URL'] = url + env['STRESS_CONCURRENT_USERS'] = str(config.get('concurrent_users', 10)) + env['STRESS_REQUESTS_PER_USER'] = str(config.get('requests_per_user', 20)) + env['STRESS_TIMEOUT'] = str(config.get('timeout', 30)) + + print("\n" + "=" * 60) + print("Running Backend API Load Tests") + print("=" * 60) + print(f" URL: {url}") + print(f" Concurrent Users: {config.get('concurrent_users', 10)}") + print(f" Requests/User: {config.get('requests_per_user', 20)}") + print() + + start_time = time.time() + result = subprocess.run( + ['python', '-m', 'pytest', 'tests/stress/test_api_load.py', '-v', '-s', '--tb=short'], + env=env, + capture_output=False, + cwd=os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + ) + duration = time.time() - start_time + + return { + 'name': 'Backend API Load Tests', + 'passed': result.returncode == 0, + 'duration': duration, + 'returncode': result.returncode + } + + +def run_frontend_tests(url: str, config: dict) -> dict: + """Run frontend Playwright stress tests.""" + env = os.environ.copy() + env['STRESS_TEST_URL'] = url + + print("\n" + "=" * 60) + print("Running Frontend Playwright Stress Tests") + print("=" * 60) + print(f" URL: {url}") + print() + + start_time = time.time() + result = subprocess.run( + ['python', '-m', 'pytest', 'tests/stress/test_frontend_stress.py', '-v', '-s', '--tb=short'], + env=env, + capture_output=False, + cwd=os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + ) + duration = time.time() - start_time + + return { + 'name': 'Frontend Playwright Stress Tests', + 'passed': result.returncode == 0, + 'duration': duration, + 'returncode': result.returncode + } + + +def generate_report(results: list, url: str, config: dict) -> str: + """Generate a text report of stress test results.""" + report_lines = [ + "=" * 60, + "MES Dashboard Stress Test Report", + "=" * 60, + f"Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", + f"Target URL: {url}", + f"Configuration: {config}", + "", + "-" * 60, + "Test Results:", + "-" * 60, + ] + + total_duration = 0 + passed_count = 0 + + for result in results: + status = "PASSED" if result['passed'] else "FAILED" + report_lines.append(f" {result['name']}: {status}") + report_lines.append(f" Duration: {result['duration']:.2f}s") + total_duration += result['duration'] + if result['passed']: + passed_count += 1 + + report_lines.extend([ + "", + "-" * 60, + "Summary:", + "-" * 60, + f" Total Tests: {len(results)}", + f" Passed: {passed_count}", + f" Failed: {len(results) - passed_count}", + f" Total Duration: {total_duration:.2f}s", + "=" * 60, + ]) + + return "\n".join(report_lines) + + +def main(): + parser = argparse.ArgumentParser(description='Run MES Dashboard stress tests') + parser.add_argument('--backend-only', action='store_true', help='Run only backend tests') + parser.add_argument('--frontend-only', action='store_true', help='Run only frontend tests') + parser.add_argument('--quick', action='store_true', help='Quick test with minimal load') + parser.add_argument('--heavy', action='store_true', help='Heavy load test') + parser.add_argument('--url', default='http://127.0.0.1:5000', help='Target URL') + parser.add_argument('--report', help='Save report to file') + + args = parser.parse_args() + + # Configure load levels + if args.quick: + config = { + 'concurrent_users': 3, + 'requests_per_user': 5, + 'timeout': 30 + } + elif args.heavy: + config = { + 'concurrent_users': 50, + 'requests_per_user': 50, + 'timeout': 60 + } + else: + config = { + 'concurrent_users': 10, + 'requests_per_user': 20, + 'timeout': 30 + } + + print("\n" + "=" * 60) + print("MES Dashboard Stress Test Suite") + print("=" * 60) + print(f"Target: {args.url}") + print(f"Mode: {'Quick' if args.quick else 'Heavy' if args.heavy else 'Normal'}") + print() + + results = [] + + # Run tests based on flags + if not args.frontend_only: + results.append(run_backend_tests(args.url, config)) + + if not args.backend_only: + results.append(run_frontend_tests(args.url, config)) + + # Generate report + report = generate_report(results, args.url, config) + print("\n" + report) + + # Save report if requested + if args.report: + with open(args.report, 'w', encoding='utf-8') as f: + f.write(report) + print(f"\nReport saved to: {args.report}") + + # Exit with appropriate code + all_passed = all(r['passed'] for r in results) + sys.exit(0 if all_passed else 1) + + +if __name__ == '__main__': + main() diff --git a/tests/stress/__init__.py b/tests/stress/__init__.py new file mode 100644 index 0000000..d4c21e5 --- /dev/null +++ b/tests/stress/__init__.py @@ -0,0 +1,2 @@ +# -*- coding: utf-8 -*- +"""Stress tests for MES Dashboard.""" diff --git a/tests/stress/conftest.py b/tests/stress/conftest.py new file mode 100644 index 0000000..560e11f --- /dev/null +++ b/tests/stress/conftest.py @@ -0,0 +1,118 @@ +# -*- coding: utf-8 -*- +"""Pytest configuration for stress tests.""" + +import pytest +import os +import sys +import time +from dataclasses import dataclass, field +from typing import List, Dict, Any + +# Add src to path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', 'src')) + + +@dataclass +class StressTestResult: + """Container for stress test results.""" + test_name: str + total_requests: int = 0 + successful_requests: int = 0 + failed_requests: int = 0 + total_duration: float = 0.0 + min_response_time: float = float('inf') + max_response_time: float = 0.0 + response_times: List[float] = field(default_factory=list) + errors: List[str] = field(default_factory=list) + + @property + def avg_response_time(self) -> float: + if not self.response_times: + return 0.0 + return sum(self.response_times) / len(self.response_times) + + @property + def success_rate(self) -> float: + if self.total_requests == 0: + return 0.0 + return (self.successful_requests / self.total_requests) * 100 + + @property + def requests_per_second(self) -> float: + if self.total_duration == 0: + return 0.0 + return self.total_requests / self.total_duration + + def add_success(self, response_time: float): + self.total_requests += 1 + self.successful_requests += 1 + self.response_times.append(response_time) + self.min_response_time = min(self.min_response_time, response_time) + self.max_response_time = max(self.max_response_time, response_time) + + def add_failure(self, error: str, response_time: float = 0): + self.total_requests += 1 + self.failed_requests += 1 + self.errors.append(error) + if response_time > 0: + self.response_times.append(response_time) + + def report(self) -> str: + """Generate human-readable report.""" + lines = [ + f"\n{'='*60}", + f"Stress Test Report: {self.test_name}", + f"{'='*60}", + f"Total Requests: {self.total_requests}", + f"Successful: {self.successful_requests}", + f"Failed: {self.failed_requests}", + f"Success Rate: {self.success_rate:.2f}%", + f"{'─'*60}", + f"Total Duration: {self.total_duration:.2f}s", + f"Requests/Second: {self.requests_per_second:.2f}", + f"{'─'*60}", + f"Min Response Time: {self.min_response_time*1000:.2f}ms" if self.min_response_time != float('inf') else "Min Response Time: N/A", + f"Max Response Time: {self.max_response_time*1000:.2f}ms", + f"Avg Response Time: {self.avg_response_time*1000:.2f}ms", + f"{'='*60}", + ] + if self.errors: + lines.append(f"Errors (first 5):") + for err in self.errors[:5]: + lines.append(f" - {err[:100]}") + return "\n".join(lines) + + +@pytest.fixture(scope="session") +def base_url() -> str: + """Get the base URL for stress testing.""" + return os.environ.get('STRESS_TEST_URL', 'http://127.0.0.1:5000') + + +@pytest.fixture(scope="session") +def stress_config() -> Dict[str, Any]: + """Get stress test configuration.""" + return { + 'concurrent_users': int(os.environ.get('STRESS_CONCURRENT_USERS', '10')), + 'requests_per_user': int(os.environ.get('STRESS_REQUESTS_PER_USER', '20')), + 'ramp_up_time': float(os.environ.get('STRESS_RAMP_UP_TIME', '2.0')), + 'timeout': float(os.environ.get('STRESS_TIMEOUT', '30.0')), + } + + +@pytest.fixture +def stress_result(): + """Factory fixture to create stress test results.""" + def _create_result(test_name: str) -> StressTestResult: + return StressTestResult(test_name=test_name) + return _create_result + + +def pytest_configure(config): + """Add custom markers for stress tests.""" + config.addinivalue_line( + "markers", "stress: mark test as stress test (may take longer)" + ) + config.addinivalue_line( + "markers", "load: mark test as load test (concurrent requests)" + ) diff --git a/tests/stress/test_api_load.py b/tests/stress/test_api_load.py new file mode 100644 index 0000000..1d07cfc --- /dev/null +++ b/tests/stress/test_api_load.py @@ -0,0 +1,327 @@ +# -*- coding: utf-8 -*- +"""Backend API load tests. + +Tests API endpoints under concurrent load to verify: +- Connection pool stability +- Timeout handling +- Response consistency under pressure + +Run with: pytest tests/stress/test_api_load.py -v -s +""" + +import pytest +import time +import requests +import concurrent.futures +from typing import List, Tuple + +# Import from local conftest via pytest fixtures + + +@pytest.mark.stress +@pytest.mark.load +class TestAPILoadConcurrent: + """Load tests with concurrent requests.""" + + def _make_request(self, url: str, timeout: float) -> Tuple[bool, float, str]: + """Make a single request and return (success, duration, error).""" + start = time.time() + try: + response = requests.get(url, timeout=timeout) + duration = time.time() - start + if response.status_code == 200: + data = response.json() + if data.get('success'): + return (True, duration, '') + return (False, duration, f"API returned success=false: {data.get('error', 'unknown')}") + return (False, duration, f"HTTP {response.status_code}") + except requests.exceptions.Timeout: + duration = time.time() - start + return (False, duration, "Request timeout") + except requests.exceptions.ConnectionError as e: + duration = time.time() - start + return (False, duration, f"Connection error: {str(e)[:50]}") + except Exception as e: + duration = time.time() - start + return (False, duration, f"Error: {str(e)[:50]}") + + def test_wip_summary_concurrent_load(self, base_url: str, stress_config: dict, stress_result): + """Test WIP summary API under concurrent load.""" + result = stress_result("WIP Summary Concurrent Load") + url = f"{base_url}/api/wip/overview/summary" + concurrent_users = stress_config['concurrent_users'] + requests_per_user = stress_config['requests_per_user'] + timeout = stress_config['timeout'] + + total_requests = concurrent_users * requests_per_user + + start_time = time.time() + with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_users) as executor: + futures = [ + executor.submit(self._make_request, url, timeout) + for _ in range(total_requests) + ] + + for future in concurrent.futures.as_completed(futures): + success, duration, error = future.result() + if success: + result.add_success(duration) + else: + result.add_failure(error, duration) + + result.total_duration = time.time() - start_time + + print(result.report()) + + # Assertions + assert result.success_rate >= 90.0, f"Success rate {result.success_rate:.1f}% is below 90%" + assert result.avg_response_time < 10.0, f"Avg response time {result.avg_response_time:.2f}s exceeds 10s" + + def test_wip_matrix_concurrent_load(self, base_url: str, stress_config: dict, stress_result): + """Test WIP matrix API under concurrent load.""" + result = stress_result("WIP Matrix Concurrent Load") + url = f"{base_url}/api/wip/overview/matrix" + concurrent_users = stress_config['concurrent_users'] + requests_per_user = stress_config['requests_per_user'] + timeout = stress_config['timeout'] + + total_requests = concurrent_users * requests_per_user + + start_time = time.time() + with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_users) as executor: + futures = [ + executor.submit(self._make_request, url, timeout) + for _ in range(total_requests) + ] + + for future in concurrent.futures.as_completed(futures): + success, duration, error = future.result() + if success: + result.add_success(duration) + else: + result.add_failure(error, duration) + + result.total_duration = time.time() - start_time + + print(result.report()) + + assert result.success_rate >= 90.0, f"Success rate {result.success_rate:.1f}% is below 90%" + assert result.avg_response_time < 15.0, f"Avg response time {result.avg_response_time:.2f}s exceeds 15s" + + def test_resource_summary_concurrent_load(self, base_url: str, stress_config: dict, stress_result): + """Test resource summary API under concurrent load.""" + result = stress_result("Resource Summary Concurrent Load") + url = f"{base_url}/api/resource/summary" + concurrent_users = stress_config['concurrent_users'] + requests_per_user = stress_config['requests_per_user'] + timeout = stress_config['timeout'] + + total_requests = concurrent_users * requests_per_user + + start_time = time.time() + with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_users) as executor: + futures = [ + executor.submit(self._make_request, url, timeout) + for _ in range(total_requests) + ] + + for future in concurrent.futures.as_completed(futures): + success, duration, error = future.result() + if success: + result.add_success(duration) + else: + result.add_failure(error, duration) + + result.total_duration = time.time() - start_time + + print(result.report()) + + assert result.success_rate >= 90.0, f"Success rate {result.success_rate:.1f}% is below 90%" + + def test_mixed_endpoints_concurrent_load(self, base_url: str, stress_config: dict, stress_result): + """Test multiple API endpoints simultaneously.""" + result = stress_result("Mixed Endpoints Concurrent Load") + endpoints = [ + f"{base_url}/api/wip/overview/summary", + f"{base_url}/api/wip/overview/matrix", + f"{base_url}/api/wip/overview/hold", + f"{base_url}/api/wip/meta/workcenters", + f"{base_url}/api/resource/summary", + ] + concurrent_users = stress_config['concurrent_users'] + timeout = stress_config['timeout'] + + # 5 requests per endpoint per user + requests_per_endpoint = 5 + total_requests = concurrent_users * len(endpoints) * requests_per_endpoint + + start_time = time.time() + with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_users) as executor: + futures = [] + for _ in range(concurrent_users): + for endpoint in endpoints: + for _ in range(requests_per_endpoint): + futures.append(executor.submit(self._make_request, endpoint, timeout)) + + for future in concurrent.futures.as_completed(futures): + success, duration, error = future.result() + if success: + result.add_success(duration) + else: + result.add_failure(error, duration) + + result.total_duration = time.time() - start_time + + print(result.report()) + + assert result.success_rate >= 85.0, f"Success rate {result.success_rate:.1f}% is below 85%" + + +@pytest.mark.stress +@pytest.mark.load +class TestAPILoadRampUp: + """Load tests with gradual ramp-up.""" + + def _make_request(self, url: str, timeout: float) -> Tuple[bool, float, str]: + """Make a single request and return (success, duration, error).""" + start = time.time() + try: + response = requests.get(url, timeout=timeout) + duration = time.time() - start + if response.status_code == 200: + data = response.json() + if data.get('success'): + return (True, duration, '') + return (False, duration, f"API error: {data.get('error', 'unknown')}") + return (False, duration, f"HTTP {response.status_code}") + except Exception as e: + duration = time.time() - start + return (False, duration, str(e)[:50]) + + def test_gradual_load_increase(self, base_url: str, stress_result): + """Test API stability as load gradually increases.""" + result = stress_result("Gradual Load Increase") + url = f"{base_url}/api/wip/overview/summary" + + # Start with 2 concurrent users, increase to 20 + load_levels = [2, 5, 10, 15, 20] + requests_per_level = 10 + timeout = 30.0 + + start_time = time.time() + + for concurrent_users in load_levels: + print(f"\n Testing with {concurrent_users} concurrent users...") + with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_users) as executor: + futures = [ + executor.submit(self._make_request, url, timeout) + for _ in range(requests_per_level) + ] + + for future in concurrent.futures.as_completed(futures): + success, duration, error = future.result() + if success: + result.add_success(duration) + else: + result.add_failure(error, duration) + + time.sleep(0.5) # Brief pause between levels + + result.total_duration = time.time() - start_time + + print(result.report()) + + assert result.success_rate >= 80.0, f"Success rate {result.success_rate:.1f}% is below 80%" + + +@pytest.mark.stress +class TestAPITimeoutHandling: + """Tests for timeout handling under load.""" + + def test_connection_recovery_after_timeout(self, base_url: str, stress_result): + """Test that API recovers after timeout scenarios.""" + result = stress_result("Connection Recovery After Timeout") + + # First, make requests with very short timeout to trigger timeouts + short_timeout_url = f"{base_url}/api/wip/overview/matrix" + + print("\n Phase 1: Triggering timeouts with 0.1s timeout...") + for _ in range(5): + start = time.time() + try: + requests.get(short_timeout_url, timeout=0.1) + result.add_success(time.time() - start) + except requests.exceptions.Timeout: + result.add_failure("Expected timeout", time.time() - start) + except Exception as e: + result.add_failure(str(e)[:50], time.time() - start) + + # Now verify system recovers with normal timeout + print(" Phase 2: Verifying recovery with 30s timeout...") + recovery_url = f"{base_url}/api/wip/overview/summary" + recovered = False + for i in range(10): + start = time.time() + try: + response = requests.get(recovery_url, timeout=30.0) + duration = time.time() - start + if response.status_code == 200 and response.json().get('success'): + result.add_success(duration) + recovered = True + print(f" Recovered on attempt {i+1}") + break + except Exception as e: + result.add_failure(str(e)[:50], time.time() - start) + time.sleep(0.5) + + result.total_duration = sum(result.response_times) + + print(result.report()) + + assert recovered, "System did not recover after timeout scenarios" + + +@pytest.mark.stress +class TestAPIResponseConsistency: + """Tests for response consistency under load.""" + + def test_response_data_consistency(self, base_url: str, stress_config: dict): + """Verify API returns consistent data structure under load.""" + url = f"{base_url}/api/wip/overview/summary" + concurrent_users = 5 + requests_per_user = 10 + timeout = 30.0 + + responses = [] + + def make_request(): + try: + response = requests.get(url, timeout=timeout) + if response.status_code == 200: + return response.json() + except Exception: + pass + return None + + with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_users) as executor: + futures = [ + executor.submit(make_request) + for _ in range(concurrent_users * requests_per_user) + ] + + for future in concurrent.futures.as_completed(futures): + result = future.result() + if result: + responses.append(result) + + # Verify all successful responses have consistent structure + assert len(responses) > 0, "No successful responses received" + + first_response = responses[0] + required_fields = {'success'} + + for i, response in enumerate(responses): + for field in required_fields: + assert field in response, f"Response {i} missing field '{field}'" + + print(f"\n Received {len(responses)} consistent responses") diff --git a/tests/stress/test_frontend_stress.py b/tests/stress/test_frontend_stress.py new file mode 100644 index 0000000..d83e2b9 --- /dev/null +++ b/tests/stress/test_frontend_stress.py @@ -0,0 +1,366 @@ +# -*- coding: utf-8 -*- +"""Frontend stress tests using Playwright. + +Tests frontend stability under high-frequency operations: +- Toast notification system under rapid fire +- MesApi client under rapid requests +- AbortController behavior +- Page navigation stress + +Run with: pytest tests/stress/test_frontend_stress.py -v -s +""" + +import pytest +import time +import re +from playwright.sync_api import Page, expect + + +@pytest.fixture(scope="session") +def app_server() -> str: + """Get the base URL for stress testing.""" + import os + return os.environ.get('STRESS_TEST_URL', 'http://127.0.0.1:5000') + + +@pytest.fixture(scope="session") +def browser_context_args(browser_context_args): + """Configure browser context for stress tests.""" + return { + **browser_context_args, + "viewport": {"width": 1280, "height": 720}, + "locale": "zh-TW", + } + + +def load_page_with_js(page: Page, url: str, timeout: int = 60000): + """Load page and wait for JS to initialize.""" + page.goto(url, wait_until='domcontentloaded', timeout=timeout) + page.wait_for_timeout(1000) # Allow JS initialization + + +@pytest.mark.stress +class TestToastStress: + """Stress tests for Toast notification system.""" + + def test_rapid_toast_creation(self, page: Page, app_server: str): + """Test Toast system under rapid creation - should enforce max limit.""" + load_page_with_js(page, f"{app_server}/tables") + + # Create 50 toasts rapidly + start_time = time.time() + for i in range(50): + page.evaluate(f"Toast.info('Rapid toast {i}')") + + creation_time = time.time() - start_time + print(f"\n Created 50 toasts in {creation_time:.3f}s") + + page.wait_for_timeout(500) + + # Should only have max 5 toasts visible + toast_count = page.locator('.mes-toast').count() + assert toast_count <= 5, f"Toast count {toast_count} exceeds max limit of 5" + print(f" Toast count enforced: {toast_count} (max 5)") + + def test_toast_type_cycling(self, page: Page, app_server: str): + """Test rapid cycling through all toast types - system remains stable.""" + load_page_with_js(page, f"{app_server}/tables") + + toast_types = ['info', 'success', 'warning', 'error'] + + start_time = time.time() + for i in range(100): + toast_type = toast_types[i % len(toast_types)] + page.evaluate(f"Toast.{toast_type}('Type cycle {i}')") + + cycle_time = time.time() - start_time + print(f"\n Cycled 100 toasts in {cycle_time:.3f}s") + + # Wait for animations to complete + page.wait_for_timeout(1000) + + # Dismiss all and verify system can recover + page.evaluate("Toast.dismissAll()") + page.wait_for_timeout(500) + + toast_count = page.locator('.mes-toast').count() + assert toast_count <= 5, f"Toast overflow after dismissAll: {toast_count}" + print(f" System stable after cleanup, toast count: {toast_count}") + + def test_toast_dismiss_stress(self, page: Page, app_server: str): + """Test rapid toast creation and dismissal.""" + load_page_with_js(page, f"{app_server}/tables") + + start_time = time.time() + + # Create and immediately dismiss + for i in range(30): + toast_id = page.evaluate(f"Toast.info('Dismiss test {i}')") + page.evaluate(f"Toast.dismiss({toast_id})") + + dismiss_time = time.time() - start_time + print(f"\n Created and dismissed 30 toasts in {dismiss_time:.3f}s") + + page.wait_for_timeout(500) + + # Should have no or few toasts + toast_count = page.locator('.mes-toast').count() + assert toast_count <= 2, f"Undismissed toasts remain: {toast_count}" + print(f" Remaining toasts: {toast_count}") + + def test_loading_toast_stress(self, page: Page, app_server: str): + """Test loading toasts can be created and properly dismissed.""" + load_page_with_js(page, f"{app_server}/tables") + + toast_ids = [] + + # Create 10 loading toasts + for i in range(10): + toast_id = page.evaluate(f"Toast.loading('Loading {i}...')") + toast_ids.append(toast_id) + + page.wait_for_timeout(200) + + # Loading toasts are created + loading_count = page.locator('.mes-toast-loading').count() + print(f"\n Created {len(toast_ids)} loading toasts, visible: {loading_count}") + + # Dismiss all using dismissAll + page.evaluate("Toast.dismissAll()") + page.wait_for_timeout(500) + + # All should be gone after dismissAll + loading_count = page.locator('.mes-toast-loading').count() + assert loading_count == 0, f"Loading toasts not dismissed: {loading_count}" + print(f" Loading toast dismiss test passed") + + +@pytest.mark.stress +class TestMesApiStress: + """Stress tests for MesApi client.""" + + def test_rapid_api_requests(self, page: Page, app_server: str): + """Test MesApi under rapid sequential requests.""" + load_page_with_js(page, f"{app_server}/tables") + + # Make 20 rapid API requests + results = page.evaluate(""" + async () => { + const results = []; + const startTime = Date.now(); + + for (let i = 0; i < 20; i++) { + try { + const response = await MesApi.get('/api/wip/meta/workcenters'); + results.push({ success: true, status: response?.status || 'ok' }); + } catch (e) { + results.push({ success: false, error: e.message }); + } + } + + return { + results, + duration: Date.now() - startTime, + successCount: results.filter(r => r.success).length + }; + } + """) + + print(f"\n 20 requests in {results['duration']}ms") + print(f" Success: {results['successCount']}/20") + + assert results['successCount'] >= 15, f"Too many failures: {20 - results['successCount']}" + + def test_concurrent_api_requests(self, page: Page, app_server: str): + """Test MesApi with concurrent requests using Promise.all.""" + load_page_with_js(page, f"{app_server}/tables") + + # Make 10 concurrent requests + results = page.evaluate(""" + async () => { + const endpoints = [ + '/api/wip/overview/summary', + '/api/wip/overview/matrix', + '/api/wip/meta/workcenters', + '/api/wip/meta/packages', + ]; + + const startTime = Date.now(); + const promises = []; + + // 2 requests per endpoint = 8 total concurrent + for (const endpoint of endpoints) { + promises.push(MesApi.get(endpoint).catch(e => ({ error: e.message }))); + promises.push(MesApi.get(endpoint).catch(e => ({ error: e.message }))); + } + + const results = await Promise.all(promises); + const successCount = results.filter(r => !r.error).length; + + return { + duration: Date.now() - startTime, + total: results.length, + successCount + }; + } + """) + + print(f"\n {results['total']} concurrent requests in {results['duration']}ms") + print(f" Success: {results['successCount']}/{results['total']}") + + assert results['successCount'] >= 6, f"Too many concurrent failures" + + def test_abort_controller_stress(self, page: Page, app_server: str): + """Test AbortController under rapid request cancellation.""" + load_page_with_js(page, f"{app_server}/tables") + + # Start requests and cancel them rapidly + results = page.evaluate(""" + async () => { + const results = { started: 0, aborted: 0, completed: 0, errors: 0 }; + + for (let i = 0; i < 10; i++) { + results.started++; + + const controller = new AbortController(); + + const request = fetch('/api/wip/overview/summary', { + signal: controller.signal + }).then(() => { + results.completed++; + }).catch(e => { + if (e.name === 'AbortError') { + results.aborted++; + } else { + results.errors++; + } + }); + + // Cancel after 50ms + setTimeout(() => controller.abort(), 50); + + await new Promise(resolve => setTimeout(resolve, 100)); + } + + return results; + } + """) + + print(f"\n Started: {results['started']}") + print(f" Aborted: {results['aborted']}") + print(f" Completed: {results['completed']}") + print(f" Errors: {results['errors']}") + + # Most should either abort or complete + total_resolved = results['aborted'] + results['completed'] + assert total_resolved >= 5, f"Too many unresolved requests" + + +@pytest.mark.stress +class TestPageNavigationStress: + """Stress tests for rapid page navigation.""" + + def test_rapid_tab_switching(self, page: Page, app_server: str): + """Test rapid tab switching in portal.""" + page.goto(app_server, wait_until='domcontentloaded', timeout=30000) + page.wait_for_timeout(500) + + tabs = [ + '.tab:has-text("WIP 即時概況")', + '.tab:has-text("機台狀態報表")', + '.tab:has-text("數據表查詢工具")', + '.tab:has-text("Excel 批次查詢")', + ] + + start_time = time.time() + + # Rapidly switch tabs 20 times + for i in range(20): + tab = tabs[i % len(tabs)] + page.locator(tab).click() + page.wait_for_timeout(50) + + switch_time = time.time() - start_time + print(f"\n 20 tab switches in {switch_time:.3f}s") + + # Page should still be responsive + expect(page.locator('h1')).to_contain_text('MES 報表入口') + print(" Portal remained stable") + + def test_portal_iframe_stress(self, page: Page, app_server: str): + """Test portal remains responsive with iframe loading.""" + page.goto(app_server, wait_until='domcontentloaded', timeout=30000) + page.wait_for_timeout(500) + + # Switch through all tabs + tabs = [ + 'WIP 即時概況', + '機台狀態報表', + '數據表查詢工具', + 'Excel 批次查詢', + ] + + for tab_name in tabs: + page.locator(f'.tab:has-text("{tab_name}")').click() + page.wait_for_timeout(200) + + # Verify tab is active + tab = page.locator(f'.tab:has-text("{tab_name}")') + expect(tab).to_have_class(re.compile(r'active')) + + print(f"\n All {len(tabs)} tabs clickable and responsive") + + +@pytest.mark.stress +class TestMemoryStress: + """Tests for memory leak detection.""" + + def test_toast_memory_cleanup(self, page: Page, app_server: str): + """Check Toast system cleans up properly.""" + load_page_with_js(page, f"{app_server}/tables") + + # Create and dismiss many toasts + for batch in range(5): + for i in range(20): + page.evaluate(f"Toast.info('Memory test {batch}-{i}')") + page.evaluate("Toast.dismissAll()") + page.wait_for_timeout(100) + + page.wait_for_timeout(500) + + # Check DOM is clean + toast_count = page.locator('.mes-toast').count() + assert toast_count <= 5, f"Toast elements not cleaned up: {toast_count}" + print(f"\n Toast memory cleanup test passed (remaining: {toast_count})") + + +@pytest.mark.stress +class TestConsoleErrorMonitoring: + """Monitor for JavaScript errors under stress.""" + + def test_no_js_errors_under_stress(self, page: Page, app_server: str): + """Verify no JavaScript errors occur under stress conditions.""" + js_errors = [] + + page.on("pageerror", lambda error: js_errors.append(str(error))) + + load_page_with_js(page, f"{app_server}/tables") + + # Perform stress operations + for i in range(30): + page.evaluate(f"Toast.info('Error check {i}')") + + for i in range(10): + page.evaluate(""" + MesApi.get('/api/wip/overview/summary').catch(() => {}) + """) + + page.wait_for_timeout(2000) + + if js_errors: + print(f"\n JavaScript errors detected:") + for err in js_errors[:5]: + print(f" - {err[:100]}") + + assert len(js_errors) == 0, f"Found {len(js_errors)} JavaScript errors" + print("\n No JavaScript errors under stress")