Files
DashBoard/tests/stress/test_api_load.py
egg c38b5f646a feat(modernization): promote deferred routes to in-scope and unify page header styles
Promote /tables, /excel-query, /query-tool, /mid-section-defect from
deferred to full shell-governed in-scope routes with canonical redirects,
content contracts, governance artifacts, and updated CI gates.

Unify all page header gradients to #667eea → #764ba2 and h1 font-size
to 24px for visual consistency across all dashboard pages. Remove
Native Route-View dev annotations from job-query, excel-query, and
query-tool headers.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-12 13:20:06 +08:00

499 lines
21 KiB
Python

# -*- coding: utf-8 -*-
"""Backend API load tests.
Tests API endpoints under concurrent load to verify:
- Connection pool stability
- Timeout handling
- Response consistency under pressure
Run with: pytest tests/stress/test_api_load.py -v -s
"""
import pytest
import time
import requests
import concurrent.futures
from typing import List, Tuple
from urllib.parse import quote
# Import from local conftest via pytest fixtures
@pytest.mark.stress
@pytest.mark.load
class TestAPILoadConcurrent:
"""Load tests with concurrent requests."""
def _make_request(self, url: str, timeout: float, headers: dict | None = None) -> Tuple[bool, float, str]:
"""Make a single request and return (success, duration, error)."""
start = time.time()
try:
response = requests.get(url, timeout=timeout, headers=headers)
duration = time.time() - start
if response.status_code == 200:
data = response.json()
if data.get('success'):
return (True, duration, '')
return (False, duration, f"API returned success=false: {data.get('error', 'unknown')}")
return (False, duration, f"HTTP {response.status_code}")
except requests.exceptions.Timeout:
duration = time.time() - start
return (False, duration, "Request timeout")
except requests.exceptions.ConnectionError as e:
duration = time.time() - start
return (False, duration, f"Connection error: {str(e)[:50]}")
except Exception as e:
duration = time.time() - start
return (False, duration, f"Error: {str(e)[:50]}")
def _discover_workcenter(self, base_url: str, timeout: float) -> str:
"""Get one available workcenter for detail load tests."""
try:
response = requests.get(f"{base_url}/api/wip/meta/workcenters", timeout=timeout)
if response.status_code != 200:
return "TMTT"
payload = response.json()
items = payload.get("data") or []
if not items:
return "TMTT"
return str(items[0].get("name") or "TMTT")
except Exception:
return "TMTT"
def _discover_hold_reason(self, base_url: str, timeout: float) -> str:
"""Get one available hold reason for hold-detail load tests."""
try:
response = requests.get(f"{base_url}/api/wip/overview/hold", timeout=timeout)
if response.status_code != 200:
return "YieldLimit"
payload = response.json()
items = (payload.get("data") or {}).get("items") or []
if not items:
return "YieldLimit"
return str(items[0].get("reason") or "YieldLimit")
except Exception:
return "YieldLimit"
def test_wip_summary_concurrent_load(self, base_url: str, stress_config: dict, stress_result):
"""Test WIP summary API under concurrent load."""
result = stress_result("WIP Summary Concurrent Load")
url = f"{base_url}/api/wip/overview/summary"
concurrent_users = stress_config['concurrent_users']
requests_per_user = stress_config['requests_per_user']
timeout = stress_config['timeout']
total_requests = concurrent_users * requests_per_user
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_users) as executor:
futures = [
executor.submit(self._make_request, url, timeout)
for _ in range(total_requests)
]
for future in concurrent.futures.as_completed(futures):
success, duration, error = future.result()
if success:
result.add_success(duration)
else:
result.add_failure(error, duration)
result.total_duration = time.time() - start_time
print(result.report())
# Assertions
assert result.success_rate >= 90.0, f"Success rate {result.success_rate:.1f}% is below 90%"
assert result.avg_response_time < 10.0, f"Avg response time {result.avg_response_time:.2f}s exceeds 10s"
def test_wip_matrix_concurrent_load(self, base_url: str, stress_config: dict, stress_result):
"""Test WIP matrix API under concurrent load."""
result = stress_result("WIP Matrix Concurrent Load")
url = f"{base_url}/api/wip/overview/matrix"
concurrent_users = stress_config['concurrent_users']
requests_per_user = stress_config['requests_per_user']
timeout = stress_config['timeout']
total_requests = concurrent_users * requests_per_user
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_users) as executor:
futures = [
executor.submit(self._make_request, url, timeout)
for _ in range(total_requests)
]
for future in concurrent.futures.as_completed(futures):
success, duration, error = future.result()
if success:
result.add_success(duration)
else:
result.add_failure(error, duration)
result.total_duration = time.time() - start_time
print(result.report())
assert result.success_rate >= 90.0, f"Success rate {result.success_rate:.1f}% is below 90%"
assert result.avg_response_time < 15.0, f"Avg response time {result.avg_response_time:.2f}s exceeds 15s"
def test_wip_detail_concurrent_load(self, base_url: str, stress_config: dict, stress_result):
"""Test WIP detail API under concurrent load."""
result = stress_result("WIP Detail Concurrent Load")
concurrent_users = stress_config['concurrent_users']
requests_per_user = stress_config['requests_per_user']
timeout = stress_config['timeout']
workcenter = self._discover_workcenter(base_url, timeout)
url = f"{base_url}/api/wip/detail/{quote(workcenter)}?page=1&page_size=100"
total_requests = concurrent_users * requests_per_user
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_users) as executor:
futures = [
executor.submit(self._make_request, url, timeout)
for _ in range(total_requests)
]
for future in concurrent.futures.as_completed(futures):
success, duration, error = future.result()
if success:
result.add_success(duration)
else:
result.add_failure(error, duration)
result.total_duration = time.time() - start_time
print(result.report())
assert result.success_rate >= 85.0, f"Success rate {result.success_rate:.1f}% is below 85%"
assert result.avg_response_time < 20.0, f"Avg response time {result.avg_response_time:.2f}s exceeds 20s"
def test_hold_detail_lots_concurrent_load(self, base_url: str, stress_config: dict, stress_result):
"""Test hold-detail lots API under concurrent load."""
result = stress_result("Hold Detail Lots Concurrent Load")
concurrent_users = stress_config['concurrent_users']
requests_per_user = stress_config['requests_per_user']
timeout = stress_config['timeout']
reason = self._discover_hold_reason(base_url, timeout)
url = f"{base_url}/api/wip/hold-detail/lots?reason={quote(reason)}&page=1&per_page=50"
total_requests = concurrent_users * requests_per_user
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_users) as executor:
futures = [
executor.submit(self._make_request, url, timeout)
for _ in range(total_requests)
]
for future in concurrent.futures.as_completed(futures):
success, duration, error = future.result()
if success:
result.add_success(duration)
else:
result.add_failure(error, duration)
result.total_duration = time.time() - start_time
print(result.report())
assert result.success_rate >= 85.0, f"Success rate {result.success_rate:.1f}% is below 85%"
assert result.avg_response_time < 20.0, f"Avg response time {result.avg_response_time:.2f}s exceeds 20s"
def test_resource_summary_concurrent_load(self, base_url: str, stress_config: dict, stress_result):
"""Test resource status summary API under concurrent load."""
result = stress_result("Resource Status Summary Concurrent Load")
url = f"{base_url}/api/resource/status/summary"
concurrent_users = stress_config['concurrent_users']
requests_per_user = stress_config['requests_per_user']
timeout = stress_config['timeout']
total_requests = concurrent_users * requests_per_user
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_users) as executor:
futures = [
executor.submit(
self._make_request,
url,
timeout,
{"X-Forwarded-For": f"10.0.0.{(idx % concurrent_users) + 1}"},
)
for idx in range(total_requests)
]
for future in concurrent.futures.as_completed(futures):
success, duration, error = future.result()
if success:
result.add_success(duration)
else:
result.add_failure(error, duration)
result.total_duration = time.time() - start_time
print(result.report())
assert result.success_rate >= 90.0, f"Success rate {result.success_rate:.1f}% is below 90%"
def test_mixed_endpoints_concurrent_load(self, base_url: str, stress_config: dict, stress_result):
"""Test multiple API endpoints simultaneously."""
result = stress_result("Mixed Endpoints Concurrent Load")
endpoints = [
f"{base_url}/api/wip/overview/summary",
f"{base_url}/api/wip/overview/matrix",
f"{base_url}/api/wip/overview/hold",
f"{base_url}/api/wip/meta/workcenters",
f"{base_url}/api/resource/status/summary",
]
concurrent_users = stress_config['concurrent_users']
timeout = stress_config['timeout']
# 5 requests per endpoint per user
requests_per_endpoint = 5
total_requests = concurrent_users * len(endpoints) * requests_per_endpoint
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_users) as executor:
futures = []
for user_idx in range(concurrent_users):
headers = {"X-Forwarded-For": f"10.0.1.{user_idx + 1}"}
for endpoint in endpoints:
for _ in range(requests_per_endpoint):
futures.append(executor.submit(self._make_request, endpoint, timeout, headers))
for future in concurrent.futures.as_completed(futures):
success, duration, error = future.result()
if success:
result.add_success(duration)
else:
result.add_failure(error, duration)
result.total_duration = time.time() - start_time
print(result.report())
assert result.success_rate >= 85.0, f"Success rate {result.success_rate:.1f}% is below 85%"
@pytest.mark.stress
@pytest.mark.load
class TestAPILoadRampUp:
"""Load tests with gradual ramp-up."""
def _make_request(self, url: str, timeout: float) -> Tuple[bool, float, str]:
"""Make a single request and return (success, duration, error)."""
start = time.time()
try:
response = requests.get(url, timeout=timeout)
duration = time.time() - start
if response.status_code == 200:
data = response.json()
if data.get('success'):
return (True, duration, '')
return (False, duration, f"API error: {data.get('error', 'unknown')}")
return (False, duration, f"HTTP {response.status_code}")
except Exception as e:
duration = time.time() - start
return (False, duration, str(e)[:50])
def test_gradual_load_increase(self, base_url: str, stress_result):
"""Test API stability as load gradually increases."""
result = stress_result("Gradual Load Increase")
url = f"{base_url}/api/wip/overview/summary"
# Start with 2 concurrent users, increase to 20
load_levels = [2, 5, 10, 15, 20]
requests_per_level = 10
timeout = 30.0
start_time = time.time()
for concurrent_users in load_levels:
print(f"\n Testing with {concurrent_users} concurrent users...")
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_users) as executor:
futures = [
executor.submit(self._make_request, url, timeout)
for _ in range(requests_per_level)
]
for future in concurrent.futures.as_completed(futures):
success, duration, error = future.result()
if success:
result.add_success(duration)
else:
result.add_failure(error, duration)
time.sleep(0.5) # Brief pause between levels
result.total_duration = time.time() - start_time
print(result.report())
assert result.success_rate >= 80.0, f"Success rate {result.success_rate:.1f}% is below 80%"
@pytest.mark.stress
class TestAPITimeoutHandling:
"""Tests for timeout handling under load."""
@staticmethod
def _make_request(url: str, timeout: float) -> Tuple[bool, float, str]:
"""Make a single request and return (success, duration, error)."""
start = time.time()
try:
response = requests.get(url, timeout=timeout)
duration = time.time() - start
if response.status_code == 200:
if "application/json" in response.headers.get("Content-Type", ""):
payload = response.json()
if payload.get("success", True):
return (True, duration, "")
return (False, duration, f"API returned success=false: {payload.get('error', 'unknown')}")
return (True, duration, "")
return (False, duration, f"HTTP {response.status_code}")
except requests.exceptions.Timeout:
duration = time.time() - start
return (False, duration, "Request timeout")
except requests.exceptions.ConnectionError as exc:
duration = time.time() - start
return (False, duration, f"Connection error: {str(exc)[:50]}")
except Exception as exc:
duration = time.time() - start
return (False, duration, f"Error: {str(exc)[:50]}")
def test_connection_recovery_after_timeout(self, base_url: str, stress_result):
"""Test that API recovers after timeout scenarios."""
result = stress_result("Connection Recovery After Timeout")
# First, make requests with very short timeout to trigger timeouts
short_timeout_url = f"{base_url}/api/wip/overview/matrix"
print("\n Phase 1: Triggering timeouts with 0.1s timeout...")
for _ in range(5):
start = time.time()
try:
requests.get(short_timeout_url, timeout=0.1)
result.add_success(time.time() - start)
except requests.exceptions.Timeout:
result.add_failure("Expected timeout", time.time() - start)
except Exception as e:
result.add_failure(str(e)[:50], time.time() - start)
# Now verify system recovers with normal timeout
print(" Phase 2: Verifying recovery with 30s timeout...")
recovery_url = f"{base_url}/api/wip/overview/summary"
recovered = False
for i in range(10):
start = time.time()
try:
response = requests.get(recovery_url, timeout=30.0)
duration = time.time() - start
if response.status_code == 200 and response.json().get('success'):
result.add_success(duration)
recovered = True
print(f" Recovered on attempt {i+1}")
break
except Exception as e:
result.add_failure(str(e)[:50], time.time() - start)
time.sleep(0.5)
result.total_duration = sum(result.response_times)
print(result.report())
assert recovered, "System did not recover after timeout scenarios"
def test_wip_pages_recoverability_after_burst(self, base_url: str, stress_result):
"""After a burst, health and critical WIP APIs should still respond."""
result = stress_result("WIP Pages Recoverability After Burst")
timeout = 30.0
probe_endpoints = [
f"{base_url}/api/wip/overview/summary",
f"{base_url}/api/wip/overview/matrix",
f"{base_url}/api/wip/overview/hold",
f"{base_url}/health",
]
# Burst phase
burst_count = 40
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = []
for _ in range(burst_count):
for endpoint in probe_endpoints[:-1]:
futures.append(executor.submit(self._make_request, endpoint, timeout))
for future in concurrent.futures.as_completed(futures):
success, duration, error = future.result()
if success:
result.add_success(duration)
else:
result.add_failure(error, duration)
# Recoverability probes
healthy_probes = 0
for _ in range(5):
probe_start = time.time()
try:
response = requests.get(f"{base_url}/health", timeout=5)
duration = time.time() - probe_start
if response.status_code in (200, 503):
payload = response.json()
if payload.get("status") in {"healthy", "degraded", "unhealthy"}:
healthy_probes += 1
result.add_success(duration)
continue
result.add_failure(f"Unexpected health response: {response.status_code}", duration)
except Exception as exc:
result.add_failure(str(exc)[:80], time.time() - probe_start)
time.sleep(0.2)
result.total_duration = time.time() - start_time
print(result.report())
assert healthy_probes >= 3, f"Health endpoint recoverability too low: {healthy_probes}/5"
@pytest.mark.stress
class TestAPIResponseConsistency:
"""Tests for response consistency under load."""
def test_response_data_consistency(self, base_url: str, stress_config: dict):
"""Verify API returns consistent data structure under load."""
url = f"{base_url}/api/wip/overview/summary"
concurrent_users = 5
requests_per_user = 10
timeout = 30.0
responses = []
def make_request():
try:
response = requests.get(url, timeout=timeout)
if response.status_code == 200:
return response.json()
except Exception:
pass
return None
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_users) as executor:
futures = [
executor.submit(make_request)
for _ in range(concurrent_users * requests_per_user)
]
for future in concurrent.futures.as_completed(futures):
result = future.result()
if result:
responses.append(result)
# Verify all successful responses have consistent structure
assert len(responses) > 0, "No successful responses received"
first_response = responses[0]
required_fields = {'success'}
for i, response in enumerate(responses):
for field in required_fields:
assert field in response, f"Response {i} missing field '{field}'"
print(f"\n Received {len(responses)} consistent responses")