214 lines
7.1 KiB
Python
214 lines
7.1 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""Runtime hardening tests for startup security, CSRF, and shutdown lifecycle."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
|
|
import mes_dashboard.core.database as db
|
|
from mes_dashboard.app import create_app
|
|
from mes_dashboard.routes.health_routes import check_database
|
|
|
|
|
|
@pytest.fixture
|
|
def testing_app_factory(monkeypatch):
|
|
def _factory(*, csrf_enabled: bool = False):
|
|
from mes_dashboard.routes import auth_routes
|
|
|
|
monkeypatch.setenv("REALTIME_EQUIPMENT_CACHE_ENABLED", "false")
|
|
with auth_routes._rate_limit_lock:
|
|
auth_routes._login_attempts.clear()
|
|
db._ENGINE = None
|
|
db._HEALTH_ENGINE = None
|
|
app = create_app("testing")
|
|
app.config["TESTING"] = True
|
|
app.config["CSRF_ENABLED"] = csrf_enabled
|
|
return app
|
|
|
|
return _factory
|
|
|
|
|
|
def _shutdown(app):
|
|
shutdown = app.extensions.get("runtime_shutdown")
|
|
if callable(shutdown):
|
|
shutdown()
|
|
|
|
|
|
def test_production_requires_secret_key(monkeypatch):
|
|
monkeypatch.delenv("SECRET_KEY", raising=False)
|
|
monkeypatch.setenv("REALTIME_EQUIPMENT_CACHE_ENABLED", "false")
|
|
db._ENGINE = None
|
|
db._HEALTH_ENGINE = None
|
|
|
|
with pytest.raises(RuntimeError, match="SECRET_KEY"):
|
|
create_app("production")
|
|
|
|
|
|
def test_login_post_rejects_missing_csrf(testing_app_factory):
|
|
app = testing_app_factory(csrf_enabled=True)
|
|
client = app.test_client()
|
|
|
|
response = client.post("/admin/login", data={"username": "user", "password": "pw"})
|
|
|
|
assert response.status_code == 403
|
|
assert "CSRF" in response.get_data(as_text=True)
|
|
_shutdown(app)
|
|
|
|
|
|
def test_login_success_rotates_session_and_clears_legacy_state(testing_app_factory):
|
|
app = testing_app_factory(csrf_enabled=True)
|
|
client = app.test_client()
|
|
|
|
client.get("/admin/login")
|
|
with client.session_transaction() as sess:
|
|
sess["_csrf_token"] = "seed-token"
|
|
sess["legacy"] = "keep-me-out"
|
|
|
|
with (
|
|
patch("mes_dashboard.routes.auth_routes.authenticate") as mock_auth,
|
|
patch("mes_dashboard.routes.auth_routes.is_admin", return_value=True),
|
|
):
|
|
mock_auth.return_value = {
|
|
"username": "admin",
|
|
"displayName": "Admin",
|
|
"mail": "admin@example.com",
|
|
"department": "IT",
|
|
}
|
|
|
|
response = client.post(
|
|
"/admin/login?next=/",
|
|
data={
|
|
"username": "admin",
|
|
"password": "secret",
|
|
"csrf_token": "seed-token",
|
|
},
|
|
follow_redirects=False,
|
|
)
|
|
|
|
assert response.status_code == 302
|
|
|
|
with client.session_transaction() as sess:
|
|
assert "legacy" not in sess
|
|
assert "admin" in sess
|
|
assert sess.get("_csrf_token")
|
|
assert sess.get("_csrf_token") != "seed-token"
|
|
|
|
_shutdown(app)
|
|
|
|
|
|
def test_runtime_shutdown_hook_invokes_all_cleanup_handlers(testing_app_factory):
|
|
app = testing_app_factory(csrf_enabled=False)
|
|
|
|
with (
|
|
patch("mes_dashboard.app.stop_cache_updater") as mock_cache_stop,
|
|
patch("mes_dashboard.app.stop_equipment_status_sync_worker") as mock_sync_stop,
|
|
patch("mes_dashboard.app.close_redis") as mock_close_redis,
|
|
patch("mes_dashboard.app.dispose_engine") as mock_dispose_engine,
|
|
):
|
|
app.extensions["runtime_shutdown"]()
|
|
|
|
mock_cache_stop.assert_called_once()
|
|
mock_sync_stop.assert_called_once()
|
|
mock_close_redis.assert_called_once()
|
|
mock_dispose_engine.assert_called_once()
|
|
|
|
_shutdown(app)
|
|
|
|
|
|
def test_health_check_uses_dedicated_health_engine():
|
|
with patch("mes_dashboard.routes.health_routes.get_health_engine") as mock_engine:
|
|
conn_ctx = mock_engine.return_value.connect.return_value
|
|
status, error = check_database()
|
|
|
|
assert status == "ok"
|
|
assert error is None
|
|
mock_engine.assert_called_once()
|
|
conn_ctx.__enter__.assert_called_once()
|
|
|
|
|
|
@patch("mes_dashboard.routes.health_routes.check_database", return_value=("ok", None))
|
|
@patch("mes_dashboard.routes.health_routes.check_redis", return_value=("ok", None))
|
|
@patch("mes_dashboard.routes.health_routes.get_route_cache_status", return_value={"mode": "l1+l2", "degraded": False})
|
|
@patch("mes_dashboard.routes.health_routes.get_pool_status", return_value={"saturation": 1.0})
|
|
@patch("mes_dashboard.core.circuit_breaker.get_circuit_breaker_status", return_value={"state": "CLOSED"})
|
|
def test_health_reports_pool_saturation_degraded_reason(
|
|
_mock_circuit,
|
|
_mock_pool_status,
|
|
_mock_route_cache,
|
|
_mock_redis,
|
|
_mock_db,
|
|
testing_app_factory,
|
|
):
|
|
app = testing_app_factory(csrf_enabled=False)
|
|
response = app.test_client().get("/health")
|
|
|
|
assert response.status_code == 200
|
|
payload = response.get_json()
|
|
assert payload["degraded_reason"] == "db_pool_saturated"
|
|
assert payload["resilience"]["recovery_recommendation"]["action"] == "consider_controlled_worker_restart"
|
|
|
|
_shutdown(app)
|
|
|
|
|
|
def test_security_headers_applied_globally(testing_app_factory):
|
|
app = testing_app_factory(csrf_enabled=False)
|
|
response = app.test_client().get("/", follow_redirects=True)
|
|
|
|
assert response.status_code == 200
|
|
assert "Content-Security-Policy" in response.headers
|
|
assert "frame-ancestors 'self'" in response.headers["Content-Security-Policy"]
|
|
assert "'unsafe-eval'" not in response.headers["Content-Security-Policy"]
|
|
assert response.headers["X-Frame-Options"] == "SAMEORIGIN"
|
|
assert response.headers["X-Content-Type-Options"] == "nosniff"
|
|
assert "Referrer-Policy" in response.headers
|
|
|
|
_shutdown(app)
|
|
|
|
|
|
def test_hsts_header_enabled_in_production(monkeypatch):
|
|
monkeypatch.setenv("SECRET_KEY", "test-production-secret-key")
|
|
monkeypatch.setenv("REALTIME_EQUIPMENT_CACHE_ENABLED", "false")
|
|
monkeypatch.setenv("RUNTIME_CONTRACT_ENFORCE", "false")
|
|
db._ENGINE = None
|
|
db._HEALTH_ENGINE = None
|
|
|
|
app = create_app("production")
|
|
app.config["TESTING"] = True
|
|
response = app.test_client().get("/", follow_redirects=True)
|
|
|
|
assert response.status_code == 200
|
|
assert "Strict-Transport-Security" in response.headers
|
|
|
|
_shutdown(app)
|
|
|
|
|
|
def test_csp_unsafe_eval_can_be_enabled_via_env(monkeypatch):
|
|
monkeypatch.setenv("CSP_ALLOW_UNSAFE_EVAL", "true")
|
|
# Build app directly to control env behavior.
|
|
monkeypatch.setenv("REALTIME_EQUIPMENT_CACHE_ENABLED", "false")
|
|
db._ENGINE = None
|
|
db._HEALTH_ENGINE = None
|
|
app = create_app("testing")
|
|
app.config["TESTING"] = True
|
|
|
|
response = app.test_client().get("/", follow_redirects=True)
|
|
assert response.status_code == 200
|
|
assert "'unsafe-eval'" in response.headers["Content-Security-Policy"]
|
|
|
|
_shutdown(app)
|
|
|
|
|
|
def test_production_trusted_proxy_requires_allowlist(monkeypatch):
|
|
monkeypatch.setenv("SECRET_KEY", "test-production-secret-key")
|
|
monkeypatch.setenv("REALTIME_EQUIPMENT_CACHE_ENABLED", "false")
|
|
monkeypatch.setenv("RUNTIME_CONTRACT_ENFORCE", "false")
|
|
monkeypatch.setenv("TRUST_PROXY_HEADERS", "true")
|
|
monkeypatch.delenv("TRUSTED_PROXY_IPS", raising=False)
|
|
db._ENGINE = None
|
|
db._HEALTH_ENGINE = None
|
|
|
|
with pytest.raises(RuntimeError, match="TRUSTED_PROXY_IPS"):
|
|
create_app("production")
|