feat: relax query limits for query-tool and mid-section-defect

This commit is contained in:
egg
2026-03-02 08:20:55 +08:00
parent 5d58ac551d
commit cdf6f67c54
15 changed files with 1177 additions and 103 deletions

View File

@@ -0,0 +1,175 @@
# -*- coding: utf-8 -*-
"""E2E UI/UX resilience tests for query-tool interactions."""
from __future__ import annotations
import json
import re
from urllib.parse import quote
import pytest
import requests
from playwright.sync_api import Page, expect
QUERY_TOOL_BASE = "/portal-shell/query-tool"
def _intercept_navigation_as_admin(page: Page):
"""Force admin navigation payload and ensure query-tool route is visible."""
def handle_route(route):
response = route.fetch()
body = response.json()
body["is_admin"] = True
drawers = body.get("drawers", [])
query_tool_entry = {
"name": "批次追蹤工具",
"order": 4,
"route": "/query-tool",
"status": "dev",
}
has_query_tool = any(
page_item.get("route") == "/query-tool"
for drawer in drawers
for page_item in drawer.get("pages", [])
)
if not has_query_tool:
target_drawer = next((drawer for drawer in drawers if not drawer.get("admin_only")), None)
if target_drawer is None:
drawers.append(
{
"id": "e2e-test-drawer",
"name": "E2E Test",
"order": 999,
"admin_only": False,
"pages": [query_tool_entry],
}
)
else:
target_drawer.setdefault("pages", []).append(query_tool_entry)
body["drawers"] = drawers
route.fulfill(
status=response.status,
headers={**response.headers, "content-type": "application/json"},
body=json.dumps(body),
)
page.route("**/api/portal/navigation", handle_route)
@pytest.mark.e2e
class TestQueryToolUiUxE2E:
"""User-centric UI/UX flows on query-tool page."""
def test_lot_multi_query_counter_and_url_round_trip(self, page: Page, app_server: str):
"""Multi-query input should sync counter + URL and survive reload."""
_intercept_navigation_as_admin(page)
page.goto(f"{app_server}{QUERY_TOOL_BASE}?tab=lot", wait_until="domcontentloaded", timeout=60000)
page.wait_for_timeout(1500)
visible_textarea = page.locator("textarea.query-tool-textarea:visible").first
visible_textarea.fill("GA26010001\nGA26010002, GA26010003")
expect(page.locator(".query-tool-input-counter:visible").first).to_contain_text("已輸入 3")
visible_select = page.locator("select.query-tool-select:visible").first
visible_select.select_option("work_order")
with page.expect_response(lambda resp: "/api/query-tool/resolve" in resp.url and resp.status < 500, timeout=90000):
page.locator("button:has-text('解析'):visible").first.click()
page.wait_for_timeout(1000)
assert "tab=lot" in page.url
assert "lot_values=" in page.url or "values=" in page.url
page.reload(wait_until="domcontentloaded")
page.wait_for_timeout(1000)
restored_text = page.locator("textarea.query-tool-textarea:visible").first.input_value()
restored_values = [v.strip() for v in re.split(r"[\n,]", restored_text) if v.strip()]
assert len(restored_values) >= 3
def test_equipment_tab_cross_navigation_preserves_filters(self, page: Page, app_server: str):
"""Equipment filter/date state should persist across tab switching."""
equipment_resp = requests.get(f"{app_server}/api/query-tool/equipment-list", timeout=30)
if equipment_resp.status_code != 200:
pytest.skip("equipment-list API is unavailable")
equipment_items = equipment_resp.json().get("data") or []
if not equipment_items:
pytest.skip("No equipment item available for E2E test")
equipment_id = str(equipment_items[0].get("RESOURCEID") or "")
if not equipment_id:
pytest.skip("Unable to determine equipment id")
_intercept_navigation_as_admin(page)
start_date = "2026-01-01"
end_date = "2026-01-31"
page.goto(
f"{app_server}{QUERY_TOOL_BASE}"
f"?tab=equipment&equipment_sub_tab=timeline"
f"&equipment_ids={quote(equipment_id)}"
f"&start_date={start_date}&end_date={end_date}",
wait_until="domcontentloaded",
timeout=60000,
)
page.wait_for_timeout(1500)
date_inputs = page.locator("input[type='date']")
expect(date_inputs.first).to_have_value(start_date)
expect(date_inputs.nth(1)).to_have_value(end_date)
js_errors = []
page.on("pageerror", lambda error: js_errors.append(str(error)))
with page.expect_response(lambda resp: "/api/query-tool/equipment-period" in resp.url and resp.status < 500, timeout=120000):
page.locator("button:has-text('查詢'):visible").first.click()
page.wait_for_timeout(1500)
page.locator("button", has_text="批次追蹤(正向)").click()
page.wait_for_timeout(400)
page.locator("button", has_text="設備生產批次追蹤").click()
page.wait_for_timeout(600)
expect(date_inputs.first).to_have_value(start_date)
expect(date_inputs.nth(1)).to_have_value(end_date)
assert len(js_errors) == 0, f"JS errors found while switching tabs: {js_errors[:3]}"
def test_rapid_resolve_and_tab_switching_no_ui_crash(self, page: Page, app_server: str):
"""Rapid resolve + tab switching should keep page responsive without crashes."""
_intercept_navigation_as_admin(page)
page.goto(f"{app_server}{QUERY_TOOL_BASE}?tab=lot", wait_until="domcontentloaded", timeout=60000)
page.wait_for_timeout(1200)
js_errors = []
page.on("pageerror", lambda error: js_errors.append(str(error)))
# Seed lot tab query input.
page.locator("select.query-tool-select:visible").first.select_option("work_order")
page.locator("textarea.query-tool-textarea:visible").first.fill("GA26010001")
for idx in range(4):
with page.expect_response(lambda resp: "/api/query-tool/resolve" in resp.url and resp.status < 500, timeout=90000):
page.locator("button:has-text('解析'):visible").first.click()
page.wait_for_timeout(350)
page.locator("button", has_text="流水批反查(反向)").click()
page.wait_for_timeout(300)
page.locator("select.query-tool-select:visible").first.select_option("serial_number")
page.locator("textarea.query-tool-textarea:visible").first.fill(f"GMSN-STRESS-{idx:03d}")
with page.expect_response(lambda resp: "/api/query-tool/resolve" in resp.url and resp.status < 500, timeout=90000):
page.locator("button:has-text('解析'):visible").first.click()
page.wait_for_timeout(300)
page.locator("button", has_text="批次追蹤(正向)").click()
page.wait_for_timeout(300)
expect(page.locator("body")).to_be_visible()
assert len(js_errors) == 0, f"Detected JS crash signals: {js_errors[:3]}"

View File

@@ -0,0 +1,371 @@
# -*- coding: utf-8 -*-
"""Query-tool specific stress coverage.
Focus:
- mixed multi-query soak behavior under concurrent traffic
- high-concurrency large payload handling (50 values per query)
- browser-side rapid interactions without JS crashes
"""
from __future__ import annotations
import concurrent.futures
import json
import os
import time
from typing import Any
import pytest
import requests
from playwright.sync_api import Page, expect
QUERY_TOOL_BASE = "/portal-shell/query-tool"
def _extract_container_id(payload: dict[str, Any]) -> str:
rows = payload.get("data") or []
if not rows:
return ""
row = rows[0] if isinstance(rows[0], dict) else {}
return str(
row.get("container_id")
or row.get("CONTAINERID")
or row.get("containerId")
or ""
)
def _intercept_navigation_as_admin(page: Page):
"""Inject query-tool route to portal navigation for stress browser tests."""
def handle_route(route):
response = route.fetch()
body = response.json()
body["is_admin"] = True
query_tool_entry = {
"name": "批次追蹤工具",
"order": 4,
"route": "/query-tool",
"status": "dev",
}
drawers = body.get("drawers", [])
has_query_tool = any(
page_item.get("route") == "/query-tool"
for drawer in drawers
for page_item in drawer.get("pages", [])
)
if not has_query_tool:
for drawer in drawers:
if not drawer.get("admin_only"):
drawer.setdefault("pages", []).append(query_tool_entry)
break
else:
drawers.append(
{
"id": "stress-test",
"name": "Stress Test",
"order": 999,
"admin_only": False,
"pages": [query_tool_entry],
}
)
body["drawers"] = drawers
route.fulfill(
status=response.status,
headers={**response.headers, "content-type": "application/json"},
body=json.dumps(body),
)
page.route("**/api/portal/navigation", handle_route)
@pytest.mark.stress
@pytest.mark.load
class TestQueryToolApiStress:
"""High-concurrency and soak tests for query-tool APIs."""
@staticmethod
def _request(
method: str,
url: str,
*,
timeout: float,
json_body: dict[str, Any] | None = None,
allowed_statuses: set[int] | None = None,
) -> tuple[bool, float, str]:
start = time.time()
try:
response = requests.request(method, url, json=json_body, timeout=timeout)
duration = time.time() - start
statuses = allowed_statuses or {200}
if response.status_code in statuses:
return True, duration, ""
return False, duration, f"HTTP {response.status_code}"
except Exception as exc: # pragma: no cover - network/runtime dependent
duration = time.time() - start
return False, duration, str(exc)[:120]
@staticmethod
def _discover_targets(base_url: str, timeout: float) -> dict[str, str]:
discovered = {
"equipment_id": "",
"equipment_name": "",
"container_id": "",
}
try:
equipment_resp = requests.get(f"{base_url}/api/query-tool/equipment-list", timeout=timeout)
if equipment_resp.status_code == 200:
items = (equipment_resp.json() or {}).get("data") or []
if items:
discovered["equipment_id"] = str(items[0].get("RESOURCEID") or "")
discovered["equipment_name"] = str(items[0].get("RESOURCENAME") or "")
except Exception:
pass
try:
resolve_resp = requests.post(
f"{base_url}/api/query-tool/resolve",
json={
"input_type": "work_order",
"values": ["GA26010001"],
},
timeout=timeout,
)
if resolve_resp.status_code == 200:
discovered["container_id"] = _extract_container_id(resolve_resp.json() or {})
except Exception:
pass
return discovered
def test_mixed_query_tool_soak_no_5xx_or_crash(
self,
base_url: str,
stress_config: dict[str, Any],
stress_result,
):
"""Run mixed query workload for a sustained period and verify recoverability."""
result = stress_result("Query Tool Mixed Soak")
timeout = stress_config["timeout"]
concurrent_users = max(4, min(stress_config["concurrent_users"], 20))
soak_seconds = int(os.environ.get("STRESS_QUERY_TOOL_SOAK_SECONDS", "45"))
targets = self._discover_targets(base_url, timeout)
equipment_id = targets["equipment_id"]
equipment_name = targets["equipment_name"]
container_id = targets["container_id"]
workload: list[dict[str, Any]] = [
{
"method": "GET",
"url": f"{base_url}/api/query-tool/equipment-list",
"allowed_statuses": {200},
},
{
"method": "GET",
"url": f"{base_url}/api/query-tool/workcenter-groups",
"allowed_statuses": {200},
},
{
"method": "POST",
"url": f"{base_url}/api/query-tool/resolve",
"json_body": {
"input_type": "lot_id",
"values": [f"STRESS-LOT-{idx:03d}" for idx in range(10)],
},
# 429 is acceptable (protection triggered, not process crash).
"allowed_statuses": {200, 429},
},
]
if equipment_id:
workload.extend(
[
{
"method": "POST",
"url": f"{base_url}/api/query-tool/equipment-period",
"json_body": {
"equipment_ids": [equipment_id],
"equipment_names": [equipment_name] if equipment_name else [],
"start_date": "2026-01-01",
"end_date": "2026-01-31",
"query_type": "status_hours",
},
"allowed_statuses": {200, 429},
},
{
"method": "POST",
"url": f"{base_url}/api/query-tool/equipment-period",
"json_body": {
"equipment_ids": [equipment_id],
"equipment_names": [equipment_name] if equipment_name else [],
"start_date": "2026-01-01",
"end_date": "2026-01-31",
"query_type": "lots",
},
"allowed_statuses": {200, 429},
},
]
)
if container_id:
workload.extend(
[
{
"method": "GET",
"url": f"{base_url}/api/query-tool/lot-history?container_id={container_id}",
"allowed_statuses": {200, 429},
},
{
"method": "GET",
"url": f"{base_url}/api/query-tool/lot-associations?container_id={container_id}&type=materials",
"allowed_statuses": {200, 429},
},
]
)
stop_at = time.time() + soak_seconds
def worker(worker_idx: int):
idx = worker_idx
while time.time() < stop_at:
spec = workload[idx % len(workload)]
ok, duration, error = self._request(
spec["method"],
spec["url"],
timeout=timeout,
json_body=spec.get("json_body"),
allowed_statuses=spec.get("allowed_statuses"),
)
if ok:
result.add_success(duration)
else:
result.add_failure(f"{error} @ {spec['url']}", duration)
idx += 1
time.sleep(0.02)
start = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_users) as executor:
futures = [executor.submit(worker, idx) for idx in range(concurrent_users)]
for future in concurrent.futures.as_completed(futures):
future.result()
result.total_duration = time.time() - start
print(result.report())
# Process-level stability threshold (accepting 429 guardrails).
assert result.total_requests >= concurrent_users * 10
assert result.success_rate >= 85.0, f"Success rate too low: {result.success_rate:.2f}%"
# Under gunicorn worker autorestart windows, `/workcenter-groups` can briefly
# return 500 before filter-cache warm-up completes. Keep this tolerance tight
# while still failing on any other 5xx regression.
five_xx_errors = [err for err in result.errors if "HTTP 5" in err]
unexpected_five_xx = [
err for err in five_xx_errors
if "/api/query-tool/workcenter-groups" not in err
]
allowed_transient_five_xx = max(5, int(result.total_requests * 0.002))
assert not unexpected_five_xx, f"Unexpected 5xx endpoints detected: {unexpected_five_xx[:5]}"
assert len(five_xx_errors) <= allowed_transient_five_xx, (
f"Too many 5xx responses ({len(five_xx_errors)} > {allowed_transient_five_xx}): "
f"{five_xx_errors[:5]}"
)
health_resp = requests.get(f"{base_url}/health", timeout=10)
assert health_resp.status_code in (200, 503)
health_payload = health_resp.json()
assert health_payload.get("status") in {"healthy", "degraded", "unhealthy"}
def test_large_multi_value_resolve_high_concurrency_stability(
self,
base_url: str,
stress_config: dict[str, Any],
stress_result,
):
"""50-value resolve payloads under concurrency should avoid 5xx and stay recoverable."""
result = stress_result("Query Tool Large Resolve Concurrency")
timeout = stress_config["timeout"]
concurrent_users = max(6, min(stress_config["concurrent_users"] * 2, 24))
total_requests = max(30, concurrent_users * 3)
def run_request(seed: int):
payload = {
"input_type": "lot_id",
"values": [f"BULK-{seed:03d}-{idx:02d}" for idx in range(50)],
}
ok, duration, error = self._request(
"POST",
f"{base_url}/api/query-tool/resolve",
timeout=timeout,
json_body=payload,
allowed_statuses={200, 429},
)
if ok:
result.add_success(duration)
else:
result.add_failure(error, duration)
start = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_users) as executor:
futures = [executor.submit(run_request, idx) for idx in range(total_requests)]
for future in concurrent.futures.as_completed(futures):
future.result()
result.total_duration = time.time() - start
print(result.report())
assert result.success_rate >= 95.0, f"Large resolve success rate too low: {result.success_rate:.2f}%"
assert all("HTTP 5" not in err for err in result.errors), f"5xx detected: {result.errors[:5]}"
# Post-burst recoverability probe.
health_resp = requests.get(f"{base_url}/health", timeout=10)
assert health_resp.status_code in (200, 503)
@pytest.mark.stress
class TestQueryToolBrowserStress:
"""Browser interaction stress focused on query-tool UI flow resilience."""
def test_rapid_lot_reverse_switching_stays_responsive(self, page: Page, base_url: str):
"""Rapid resolve + tab switch cycles should not crash frontend runtime."""
_intercept_navigation_as_admin(page)
page.goto(f"{base_url}{QUERY_TOOL_BASE}?tab=lot", wait_until="domcontentloaded", timeout=60000)
page.wait_for_timeout(1200)
js_errors = []
page.on("pageerror", lambda error: js_errors.append(str(error)))
page.locator("select.query-tool-select:visible").first.select_option("work_order")
page.locator("textarea.query-tool-textarea:visible").first.fill("GA26010001")
for idx in range(8):
with page.expect_response(
lambda resp: "/api/query-tool/resolve" in resp.url and resp.status < 500,
timeout=90000,
):
page.locator("button:has-text('解析'):visible").first.click()
page.wait_for_timeout(250)
page.locator("button", has_text="流水批反查(反向)").click()
page.wait_for_timeout(200)
page.locator("select.query-tool-select:visible").first.select_option("serial_number")
page.locator("textarea.query-tool-textarea:visible").first.fill(f"GMSN-{idx:05d}")
with page.expect_response(
lambda resp: "/api/query-tool/resolve" in resp.url and resp.status < 500,
timeout=90000,
):
page.locator("button:has-text('解析'):visible").first.click()
page.wait_for_timeout(250)
page.locator("button", has_text="批次追蹤(正向)").click()
page.wait_for_timeout(200)
expect(page.locator("body")).to_be_visible()
assert len(js_errors) == 0, f"Detected JS errors under rapid interaction: {js_errors[:3]}"

View File

@@ -35,10 +35,10 @@ def test_query_analysis_start_after_end_returns_error():
def test_query_analysis_exceeds_max_days_returns_error():
result = query_analysis('2025-01-01', '2025-12-31')
result = query_analysis('2025-01-01', '2026-01-02')
assert 'error' in result
assert '180' in result['error']
assert '365' in result['error']
@patch('mes_dashboard.services.mid_section_defect_service.query_analysis')

View File

@@ -0,0 +1,215 @@
# -*- coding: utf-8 -*-
"""Integration-style concurrency tests for query-tool routes.
Focus areas:
- multi-query payload handling under concurrent traffic
- mixed endpoint interaction stability
- oversized batch request rejection under burst load
- sustained repeated querying without process-level failures
"""
from __future__ import annotations
import concurrent.futures
import time
from typing import Any
from unittest.mock import patch
import pytest
from mes_dashboard import create_app
from mes_dashboard.core.cache import NoOpCache
@pytest.fixture
def app():
"""Create isolated Flask app for concurrency integration tests."""
app = create_app("testing")
app.config["TESTING"] = True
app.extensions["cache"] = NoOpCache()
return app
class TestQueryToolConcurrencyIntegration:
"""Concurrency-focused tests for query-tool route behavior."""
@patch("mes_dashboard.core.rate_limit.check_and_record", return_value=(False, 0))
@patch("mes_dashboard.routes.query_tool_routes.resolve_lots")
def test_resolve_concurrent_multi_query_payloads_no_5xx(
self,
mock_resolve,
_mock_rate_limit,
app,
):
"""Concurrent resolve requests with 50-item payloads should stay stable."""
def fake_resolve(input_type: str, values: list[str]) -> dict[str, Any]:
# Simulate a slightly expensive resolve path.
time.sleep(0.01)
resolved = [
{
"container_id": f"CID-{idx:03d}",
"input_value": value,
"input_type": input_type,
}
for idx, value in enumerate(values, start=1)
]
return {
"data": resolved,
"total": len(resolved),
"input_count": len(values),
"not_found": [],
}
mock_resolve.side_effect = fake_resolve
request_count = 36
workers = 12
def run_request(seed: int) -> tuple[int, dict[str, Any]]:
payload = {
"input_type": "lot_id",
"values": [f"LOT-{seed:03d}-{idx:02d}" for idx in range(50)],
}
with app.test_client() as client:
response = client.post("/api/query-tool/resolve", json=payload)
return response.status_code, response.get_json() or {}
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
futures = [executor.submit(run_request, idx) for idx in range(request_count)]
results = [future.result() for future in concurrent.futures.as_completed(futures)]
statuses = [status for status, _payload in results]
payloads = [payload for _status, payload in results]
assert all(status == 200 for status in statuses), f"Unexpected statuses: {statuses}"
assert all("data" in payload for payload in payloads)
assert all(payload.get("total") == 50 for payload in payloads)
assert all(len(payload.get("not_found", [])) == 0 for payload in payloads)
@patch("mes_dashboard.core.rate_limit.check_and_record", return_value=(False, 0))
@patch("mes_dashboard.routes.query_tool_routes.get_lot_associations_batch")
@patch("mes_dashboard.routes.query_tool_routes.get_lot_history_batch")
def test_mixed_batch_history_and_association_queries_under_concurrency(
self,
mock_history_batch,
mock_assoc_batch,
_mock_rate_limit,
app,
):
"""Concurrent mixed batch endpoints should preserve response contract."""
mock_history_batch.side_effect = lambda cids, workcenter_groups=None: {
"data": [
{
"CONTAINERID": cid,
"WORKCENTER_GROUPS": workcenter_groups or [],
}
for cid in cids
],
"total": len(cids),
}
mock_assoc_batch.side_effect = lambda cids, assoc_type: {
"data": [
{"CONTAINERID": cid, "TYPE": assoc_type}
for cid in cids
],
"total": len(cids),
}
request_count = 60
workers = 16
def run_request(index: int) -> tuple[int, dict[str, Any]]:
with app.test_client() as client:
if index % 3 == 0:
response = client.get(
"/api/query-tool/lot-history?"
"container_ids=CID-001,CID-002,CID-003&workcenter_groups=WB,FA"
)
else:
assoc_type = "materials" if index % 2 == 0 else "rejects"
response = client.get(
"/api/query-tool/lot-associations?"
f"container_ids=CID-001,CID-002&type={assoc_type}"
)
return response.status_code, response.get_json() or {}
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
futures = [executor.submit(run_request, idx) for idx in range(request_count)]
results = [future.result() for future in concurrent.futures.as_completed(futures)]
statuses = [status for status, _payload in results]
payloads = [payload for _status, payload in results]
assert all(status == 200 for status in statuses), f"Unexpected statuses: {statuses}"
assert all("data" in payload for payload in payloads)
assert all("total" in payload for payload in payloads)
assert mock_history_batch.called
assert mock_assoc_batch.called
@patch("mes_dashboard.core.rate_limit.check_and_record", return_value=(False, 0))
@patch("mes_dashboard.routes.query_tool_routes.get_lot_history_batch")
def test_oversized_batch_burst_is_rejected_without_service_execution(
self,
mock_history_batch,
_mock_rate_limit,
app,
):
"""Burst oversized batch requests should short-circuit to 413 safely."""
app.config["QUERY_TOOL_MAX_CONTAINER_IDS"] = 10
huge_ids = ",".join([f"CID-{idx:03d}" for idx in range(80)])
request_count = 30
workers = 12
def run_request() -> tuple[int, float]:
with app.test_client() as client:
start = time.time()
response = client.get(f"/api/query-tool/lot-history?container_ids={huge_ids}")
return response.status_code, time.time() - start
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
futures = [executor.submit(run_request) for _ in range(request_count)]
results = [future.result() for future in concurrent.futures.as_completed(futures)]
statuses = [status for status, _duration in results]
durations = [duration for _status, duration in results]
assert all(status == 413 for status in statuses), f"Unexpected statuses: {statuses}"
# Fast-fail guard: this should reject quickly, not run heavy logic.
assert max(durations) < 1.0, f"Oversized requests were unexpectedly slow: {max(durations):.3f}s"
mock_history_batch.assert_not_called()
@patch("mes_dashboard.core.rate_limit.check_and_record", return_value=(False, 0))
@patch("mes_dashboard.routes.query_tool_routes.resolve_lots")
def test_sustained_resolve_sequence_remains_stable(
self,
mock_resolve,
_mock_rate_limit,
app,
):
"""Repeated resolve requests over time should not degrade to 5xx."""
mock_resolve.side_effect = lambda input_type, values: {
"data": [
{"container_id": f"{input_type}-{idx}", "input_value": value}
for idx, value in enumerate(values)
],
"total": len(values),
"input_count": len(values),
"not_found": [],
}
failures: list[int] = []
with app.test_client() as client:
for round_idx in range(120):
response = client.post(
"/api/query-tool/resolve",
json={
"input_type": "lot_id",
"values": [f"GA2601{round_idx:03d}-A00-{idx:03d}" for idx in range(20)],
},
)
if response.status_code != 200:
failures.append(response.status_code)
assert not failures, f"Sustained resolve produced failures: {failures[:10]}"

View File

@@ -11,9 +11,10 @@ import pytest
import json
from unittest.mock import patch, MagicMock
from mes_dashboard import create_app
from mes_dashboard.core.cache import NoOpCache
from mes_dashboard.core.rate_limit import reset_rate_limits_for_tests
from mes_dashboard import create_app
from mes_dashboard.core.cache import NoOpCache
from mes_dashboard.core.rate_limit import reset_rate_limits_for_tests
from mes_dashboard.services.query_tool_service import MAX_DATE_RANGE_DAYS, MAX_LOT_IDS
@pytest.fixture
@@ -117,21 +118,20 @@ class TestResolveEndpoint:
data = json.loads(response.data)
assert 'error' in data
def test_values_over_limit(self, client):
"""Should reject values exceeding limit."""
# More than MAX_LOT_IDS (50)
values = [f'GA{i:09d}' for i in range(51)]
response = client.post(
'/api/query-tool/resolve',
json={
'input_type': 'lot_id',
'values': values
def test_values_over_limit(self, client):
"""Should reject values exceeding limit."""
values = [f'GA{i:09d}' for i in range(MAX_LOT_IDS + 1)]
response = client.post(
'/api/query-tool/resolve',
json={
'input_type': 'lot_id',
'values': values
}
)
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
assert '超過上限' in data['error'] or '50' in data['error']
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
assert '超過上限' in data['error'] or str(MAX_LOT_IDS) in data['error']
@patch('mes_dashboard.routes.query_tool_routes.resolve_lots')
def test_resolve_success(self, mock_resolve, client):
@@ -677,21 +677,21 @@ class TestEquipmentPeriodEndpoint:
assert 'error' in data
assert '結束日期' in data['error'] or '早於' in data['error']
def test_date_range_exceeds_limit(self, client):
"""Should reject date range > 90 days."""
response = client.post(
'/api/query-tool/equipment-period',
json={
'equipment_ids': ['EQ001'],
'start_date': '2024-01-01',
'end_date': '2024-06-01',
'query_type': 'status_hours'
}
)
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
assert '90' in data['error']
def test_date_range_exceeds_limit(self, client):
"""Should reject date range greater than service max days."""
response = client.post(
'/api/query-tool/equipment-period',
json={
'equipment_ids': ['EQ001'],
'start_date': '2024-01-01',
'end_date': '2025-01-02',
'query_type': 'status_hours'
}
)
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
assert str(MAX_DATE_RANGE_DAYS) in data['error']
def test_invalid_query_type(self, client):
"""Should reject invalid query_type."""

View File

@@ -18,12 +18,13 @@ from mes_dashboard.services.query_tool_service import (
_resolve_by_work_order,
get_lot_split_merge_history,
BATCH_SIZE,
MAX_LOT_IDS,
MAX_SERIAL_NUMBERS,
MAX_WORK_ORDERS,
MAX_EQUIPMENTS,
MAX_DATE_RANGE_DAYS,
)
MAX_LOT_IDS,
MAX_SERIAL_NUMBERS,
MAX_WORK_ORDERS,
MAX_GD_WORK_ORDERS,
MAX_EQUIPMENTS,
MAX_DATE_RANGE_DAYS,
)
class TestValidateDateRange:
@@ -51,18 +52,18 @@ class TestValidateDateRange:
assert result is not None
assert str(MAX_DATE_RANGE_DAYS) in result
def test_exactly_max_range(self):
"""Should allow exactly max range days."""
# 90 days from 2024-01-01 is 2024-03-31
result = validate_date_range('2024-01-01', '2024-03-31')
assert result is None
def test_one_day_over_max_range(self):
"""Should reject one day over max range."""
# 91 days
result = validate_date_range('2024-01-01', '2024-04-02')
assert result is not None
assert str(MAX_DATE_RANGE_DAYS) in result
def test_exactly_max_range(self):
"""Should allow exactly max range days."""
# 365 days from 2025-01-01 is 2026-01-01
result = validate_date_range('2025-01-01', '2026-01-01')
assert result is None
def test_one_day_over_max_range(self):
"""Should reject one day over max range."""
# 366 days
result = validate_date_range('2025-01-01', '2026-01-02')
assert result is not None
assert str(MAX_DATE_RANGE_DAYS) in result
def test_invalid_date_format(self):
"""Should reject invalid date format."""
@@ -132,13 +133,21 @@ class TestValidateLotInput:
assert '超過上限' in result
assert str(MAX_SERIAL_NUMBERS) in result
def test_exceeds_work_order_limit(self):
"""Should reject work orders exceeding limit."""
values = [f'WO{i:06d}' for i in range(MAX_WORK_ORDERS + 1)]
result = validate_lot_input('work_order', values)
assert result is not None
assert '超過上限' in result
assert str(MAX_WORK_ORDERS) in result
def test_exceeds_work_order_limit(self):
"""Should reject work orders exceeding limit."""
values = [f'WO{i:06d}' for i in range(MAX_WORK_ORDERS + 1)]
result = validate_lot_input('work_order', values)
assert result is not None
assert '超過上限' in result
assert str(MAX_WORK_ORDERS) in result
def test_exceeds_gd_work_order_limit(self):
"""Should reject GD work orders exceeding limit."""
values = [f'GD{i:06d}' for i in range(MAX_GD_WORK_ORDERS + 1)]
result = validate_lot_input('gd_work_order', values)
assert result is not None
assert '超過上限' in result
assert str(MAX_GD_WORK_ORDERS) in result
def test_exactly_at_limit(self):
"""Should accept values exactly at limit."""
@@ -451,9 +460,9 @@ class TestServiceConstants:
"""Batch size should be <= 1000 (Oracle limit)."""
assert BATCH_SIZE <= 1000
def test_max_date_range_is_reasonable(self):
"""Max date range should be 90 days."""
assert MAX_DATE_RANGE_DAYS == 90
def test_max_date_range_is_reasonable(self):
"""Max date range should be 365 days."""
assert MAX_DATE_RANGE_DAYS == 365
def test_max_lot_ids_is_reasonable(self):
"""Max LOT IDs should be sensible."""
@@ -463,9 +472,13 @@ class TestServiceConstants:
"""Max serial numbers should be sensible."""
assert 10 <= MAX_SERIAL_NUMBERS <= 100
def test_max_work_orders_is_reasonable(self):
"""Max work orders should be low due to expansion."""
assert MAX_WORK_ORDERS <= 20 # Work orders can expand to many LOTs
def test_max_work_orders_is_reasonable(self):
"""Max work orders should match API contract."""
assert MAX_WORK_ORDERS == 50
def test_max_gd_work_orders_is_reasonable(self):
"""Max GD work orders should match API contract."""
assert MAX_GD_WORK_ORDERS == 100
def test_max_equipments_is_reasonable(self):
"""Max equipments should be sensible."""

View File

@@ -5,6 +5,7 @@ from __future__ import annotations
from unittest.mock import patch
import pytest
import mes_dashboard.core.database as db
from mes_dashboard.app import create_app
from mes_dashboard.core.cache import NoOpCache
@@ -147,6 +148,49 @@ def test_seed_resolve_query_tool_rejects_reverse_only_type():
assert payload['error']['code'] == 'INVALID_PARAMS'
@pytest.mark.parametrize(
("resolve_type", "input_value"),
[
("serial_number", "SN-001"),
("gd_work_order", "GD25060001"),
("gd_lot_id", "GD25060502-A11"),
],
)
@patch('mes_dashboard.routes.trace_routes.resolve_lots')
def test_seed_resolve_mid_section_defect_container_supports_reverse_input_types(
mock_resolve_lots,
resolve_type,
input_value,
):
mock_resolve_lots.return_value = {
'data': [
{
'container_id': 'CID-MSD',
'lot_id': 'LOT-MSD',
}
]
}
client = _client()
response = client.post(
'/api/trace/seed-resolve',
json={
'profile': 'mid_section_defect',
'params': {
'mode': 'container',
'resolve_type': resolve_type,
'values': [input_value],
},
},
)
assert response.status_code == 200
payload = response.get_json()
assert payload['stage'] == 'seed-resolve'
assert payload['seed_count'] == 1
assert payload['seeds'][0]['container_id'] == 'CID-MSD'
def test_seed_resolve_invalid_profile_returns_400():
client = _client()
response = client.post(