chore: finalize vite migration hardening and watchdog logging

This commit is contained in:
beabigegg
2026-02-08 22:55:38 +08:00
parent c8e225101e
commit cf194bc3a3
27 changed files with 924 additions and 356 deletions

View File

@@ -102,9 +102,62 @@ class TestLoginRoute:
assert response.status_code == 302
# Check session contains admin
with client.session_transaction() as sess:
assert "admin" in sess
assert sess["admin"]["username"] == "92367"
with client.session_transaction() as sess:
assert "admin" in sess
assert sess["admin"]["username"] == "92367"
@patch('mes_dashboard.services.auth_service.LOCAL_AUTH_ENABLED', False)
@patch('mes_dashboard.routes.auth_routes.is_admin', return_value=True)
@patch('mes_dashboard.services.auth_service.requests.post')
def test_login_blocks_external_next_redirect(self, mock_post, _mock_is_admin, client):
"""Should ignore external next URL and redirect to portal."""
mock_response = MagicMock()
mock_response.json.return_value = {
"success": True,
"user": {
"username": "92367",
"displayName": "Admin User",
"mail": "ymirliu@panjit.com.tw",
"department": "Test Dept",
},
}
mock_post.return_value = mock_response
response = client.post(
"/admin/login?next=https://evil.example/phish",
data={"username": "92367", "password": "password123"},
follow_redirects=False,
)
assert response.status_code == 302
assert "evil.example" not in response.location
assert response.location.endswith("/")
@patch('mes_dashboard.services.auth_service.LOCAL_AUTH_ENABLED', False)
@patch('mes_dashboard.routes.auth_routes.is_admin', return_value=True)
@patch('mes_dashboard.services.auth_service.requests.post')
def test_login_allows_internal_next_redirect(self, mock_post, _mock_is_admin, client):
"""Should keep validated local path in next URL."""
mock_response = MagicMock()
mock_response.json.return_value = {
"success": True,
"user": {
"username": "92367",
"displayName": "Admin User",
"mail": "ymirliu@panjit.com.tw",
"department": "Test Dept",
},
}
mock_post.return_value = mock_response
response = client.post(
"/admin/login?next=/admin/pages",
data={"username": "92367", "password": "password123"},
follow_redirects=False,
)
assert response.status_code == 302
assert response.location.endswith("/admin/pages")
@patch('mes_dashboard.services.auth_service.LOCAL_AUTH_ENABLED', False)
@patch('mes_dashboard.services.auth_service.requests.post')

View File

@@ -161,6 +161,24 @@ class TestLocalAuthenticate:
assert result is None
class TestLocalAuthSafetyGuard:
"""Tests for production guard on local auth toggle."""
def test_resolve_local_auth_enabled_blocks_production(self):
result = auth_service._resolve_local_auth_enabled(
raw_value="true",
flask_env="production",
)
assert result is False
def test_resolve_local_auth_enabled_allows_development(self):
result = auth_service._resolve_local_auth_enabled(
raw_value="true",
flask_env="development",
)
assert result is True
class TestIsAdmin:
"""Tests for is_admin function."""

View File

@@ -149,9 +149,9 @@ class TestLoadFullTable:
assert result is None
class TestUpdateRedisCache:
"""Test Redis cache update logic."""
class TestUpdateRedisCache:
"""Test Redis cache update logic."""
def test_update_redis_cache_success(self):
"""Test _update_redis_cache updates cache correctly."""
import mes_dashboard.core.cache_updater as cu
@@ -173,7 +173,10 @@ class TestUpdateRedisCache:
assert result is True
mock_pipeline.rename.assert_called_once()
mock_pipeline.execute.assert_called_once()
assert mock_pipeline.set.call_count == 3
for call in mock_pipeline.set.call_args_list:
assert call.kwargs.get("ex") == updater.interval * 3
def test_update_redis_cache_no_client(self):
"""Test _update_redis_cache handles no client."""
import mes_dashboard.core.cache_updater as cu
@@ -205,6 +208,26 @@ class TestUpdateRedisCache:
mock_client.delete.assert_called_once()
staged_key = mock_client.delete.call_args.args[0]
assert "staging" in staged_key
def test_update_redis_cache_ttl_override(self):
"""Configured TTL override should apply to all Redis keys."""
import mes_dashboard.core.cache_updater as cu
mock_client = MagicMock()
mock_pipeline = MagicMock()
mock_client.pipeline.return_value = mock_pipeline
test_df = pd.DataFrame({'LOTID': ['LOT001'], 'QTY': [100]})
with patch.object(cu, 'WIP_CACHE_TTL_SECONDS', 42):
with patch.object(cu, 'get_redis_client', return_value=mock_client):
with patch.object(cu, 'get_key', side_effect=lambda k: f'mes_wip:{k}'):
updater = cu.CacheUpdater(interval=600)
result = updater._update_redis_cache(test_df, '2024-01-15 10:30:00')
assert result is True
assert mock_pipeline.set.call_count == 3
for call in mock_pipeline.set.call_args_list:
assert call.kwargs.get("ex") == 42
class TestCacheUpdateFlow:

View File

@@ -4,17 +4,25 @@
Tests the core service functions without database dependencies.
"""
import pytest
from mes_dashboard.services.excel_query_service import (
detect_excel_column_type,
escape_like_pattern,
build_like_condition,
build_date_range_condition,
validate_like_keywords,
sanitize_column_name,
validate_table_name,
LIKE_KEYWORD_LIMIT,
)
import pytest
from unittest.mock import MagicMock, patch
from mes_dashboard.services.excel_query_service import (
parse_excel,
get_column_unique_values,
execute_batch_query,
execute_advanced_batch_query,
detect_excel_column_type,
escape_like_pattern,
build_like_condition,
build_date_range_condition,
validate_like_keywords,
sanitize_column_name,
validate_table_name,
LIKE_KEYWORD_LIMIT,
PARSE_ERROR_MESSAGE,
COLUMN_READ_ERROR_MESSAGE,
QUERY_ERROR_MESSAGE,
)
class TestDetectExcelColumnType:
@@ -236,7 +244,7 @@ class TestSanitizeColumnName:
assert sanitize_column_name("COL; DROP TABLE--") == 'COLDROPTABLE'
class TestValidateTableName:
class TestValidateTableName:
"""Tests for validate_table_name function."""
def test_simple_name(self):
@@ -256,6 +264,65 @@ class TestValidateTableName:
assert validate_table_name('TABLE-NAME') is False
assert validate_table_name('TABLE NAME') is False
def test_sql_injection_prevention(self):
"""Should reject SQL injection attempts."""
assert validate_table_name('TABLE; DROP--') is False
def test_sql_injection_prevention(self):
"""Should reject SQL injection attempts."""
assert validate_table_name('TABLE; DROP--') is False
class TestErrorLeakageProtection:
"""Tests for exception detail masking in excel-query service."""
@patch("mes_dashboard.services.excel_query_service.pd.read_excel")
def test_parse_excel_masks_internal_error_details(self, mock_read_excel):
mock_read_excel.side_effect = RuntimeError("openpyxl stack trace detail")
result = parse_excel(MagicMock())
assert result["error"] == PARSE_ERROR_MESSAGE
assert "openpyxl" not in result["error"]
@patch("mes_dashboard.services.excel_query_service.pd.read_excel")
def test_get_column_unique_values_masks_internal_error_details(self, mock_read_excel):
mock_read_excel.side_effect = RuntimeError("internal parser detail")
result = get_column_unique_values(MagicMock(), "LOT_ID")
assert result["error"] == COLUMN_READ_ERROR_MESSAGE
assert "internal parser detail" not in result["error"]
@patch("mes_dashboard.services.excel_query_service.get_db_connection")
def test_execute_batch_query_masks_internal_error_details(self, mock_get_db):
mock_cursor = MagicMock()
mock_cursor.execute.side_effect = RuntimeError("ORA-00942: table missing")
mock_conn = MagicMock()
mock_conn.cursor.return_value = mock_cursor
mock_get_db.return_value = mock_conn
result = execute_batch_query(
table_name="DWH.DW_MES_WIP",
search_column="LOT_ID",
return_columns=["LOT_ID"],
search_values=["LOT001"],
)
assert result["error"] == QUERY_ERROR_MESSAGE
assert "ORA-00942" not in result["error"]
@patch("mes_dashboard.services.excel_query_service.get_db_connection")
def test_execute_advanced_batch_query_masks_internal_error_details(self, mock_get_db):
mock_cursor = MagicMock()
mock_cursor.execute.side_effect = RuntimeError("sensitive sql context")
mock_conn = MagicMock()
mock_conn.cursor.return_value = mock_cursor
mock_get_db.return_value = mock_conn
result = execute_advanced_batch_query(
table_name="DWH.DW_MES_WIP",
search_column="LOT_ID",
return_columns=["LOT_ID"],
search_values=["LOT001"],
query_type="in",
)
assert result["error"] == QUERY_ERROR_MESSAGE
assert "sensitive sql context" not in result["error"]

View File

@@ -74,15 +74,17 @@ class TestGetResources:
data = json.loads(response.data)
assert 'error' in data
@patch('mes_dashboard.services.resource_cache.get_all_resources')
def test_get_resources_exception(self, mock_get_resources, client):
"""Should handle exception gracefully."""
mock_get_resources.side_effect = Exception('Database error')
response = client.get('/api/job-query/resources')
assert response.status_code == 500
data = json.loads(response.data)
assert 'error' in data
@patch('mes_dashboard.services.resource_cache.get_all_resources')
def test_get_resources_exception(self, mock_get_resources, client):
"""Should handle exception gracefully."""
mock_get_resources.side_effect = Exception('ORA-01017 invalid username/password')
response = client.get('/api/job-query/resources')
assert response.status_code == 500
data = json.loads(response.data)
assert 'error' in data
assert data['error'] == '服務暫時無法使用'
assert 'ORA-01017' not in data['error']
class TestQueryJobs:

View File

@@ -4,14 +4,19 @@
Tests the core service functions without database dependencies.
"""
import pytest
from mes_dashboard.services.job_query_service import (
validate_date_range,
_build_resource_filter,
_build_resource_filter_sql,
BATCH_SIZE,
MAX_DATE_RANGE_DAYS,
)
import pytest
from unittest.mock import patch
from mes_dashboard.services.job_query_service import (
validate_date_range,
_build_resource_filter,
_build_resource_filter_sql,
get_jobs_by_resources,
export_jobs_with_history,
BATCH_SIZE,
MAX_DATE_RANGE_DAYS,
QUERY_ERROR_MESSAGE,
EXPORT_ERROR_MESSAGE,
)
class TestValidateDateRange:
@@ -77,94 +82,125 @@ class TestValidateDateRange:
assert '格式' in result or 'format' in result.lower()
class TestBuildResourceFilter:
"""Tests for _build_resource_filter function."""
def test_empty_list(self):
"""Should return empty list for empty input."""
result = _build_resource_filter([])
assert result == []
def test_single_id(self):
"""Should return single chunk for single ID."""
result = _build_resource_filter(['RES001'])
assert len(result) == 1
assert result[0] == "'RES001'"
def test_multiple_ids(self):
"""Should join multiple IDs with comma."""
result = _build_resource_filter(['RES001', 'RES002', 'RES003'])
assert len(result) == 1
assert "'RES001'" in result[0]
assert "'RES002'" in result[0]
assert "'RES003'" in result[0]
def test_chunking(self):
"""Should chunk when exceeding batch size."""
# Create more than BATCH_SIZE IDs
ids = [f'RES{i:05d}' for i in range(BATCH_SIZE + 10)]
result = _build_resource_filter(ids)
assert len(result) == 2
# First chunk should have BATCH_SIZE items
assert result[0].count("'") == BATCH_SIZE * 2 # 2 quotes per ID
def test_escape_single_quotes(self):
"""Should escape single quotes in IDs."""
result = _build_resource_filter(["RES'001"])
assert len(result) == 1
assert "RES''001" in result[0] # Escaped
def test_custom_chunk_size(self):
"""Should respect custom chunk size."""
ids = ['RES001', 'RES002', 'RES003', 'RES004', 'RES005']
result = _build_resource_filter(ids, max_chunk_size=2)
class TestBuildResourceFilter:
"""Tests for _build_resource_filter function."""
def test_empty_list(self):
"""Should return empty list for empty input."""
result = _build_resource_filter([])
assert result == []
def test_single_id(self):
"""Should return single chunk for single ID."""
result = _build_resource_filter(['RES001'])
assert len(result) == 1
assert result[0] == ['RES001']
def test_multiple_ids(self):
"""Should join multiple IDs with comma."""
result = _build_resource_filter(['RES001', 'RES002', 'RES003'])
assert len(result) == 1
assert result[0] == ['RES001', 'RES002', 'RES003']
def test_chunking(self):
"""Should chunk when exceeding batch size."""
# Create more than BATCH_SIZE IDs
ids = [f'RES{i:05d}' for i in range(BATCH_SIZE + 10)]
result = _build_resource_filter(ids)
assert len(result) == 2
# First chunk should have BATCH_SIZE items
assert len(result[0]) == BATCH_SIZE
def test_preserve_id_value_without_sql_interpolation(self):
"""Should keep raw value and defer safety to bind variables."""
result = _build_resource_filter(["RES'001"])
assert len(result) == 1
assert result[0] == ["RES'001"]
def test_custom_chunk_size(self):
"""Should respect custom chunk size."""
ids = ['RES001', 'RES002', 'RES003', 'RES004', 'RES005']
result = _build_resource_filter(ids, max_chunk_size=2)
assert len(result) == 3 # 2+2+1
class TestBuildResourceFilterSql:
"""Tests for _build_resource_filter_sql function."""
def test_empty_list(self):
"""Should return 1=0 for empty input (no results)."""
result = _build_resource_filter_sql([])
assert result == "1=0"
def test_single_id(self):
"""Should build simple IN clause for single ID."""
result = _build_resource_filter_sql(['RES001'])
assert "j.RESOURCEID IN" in result
assert "'RES001'" in result
def test_multiple_ids(self):
"""Should build IN clause with multiple IDs."""
result = _build_resource_filter_sql(['RES001', 'RES002'])
assert "j.RESOURCEID IN" in result
assert "'RES001'" in result
assert "'RES002'" in result
def test_custom_column(self):
"""Should use custom column name."""
result = _build_resource_filter_sql(['RES001'], column='r.ID')
assert "r.ID IN" in result
class TestBuildResourceFilterSql:
"""Tests for _build_resource_filter_sql function."""
def test_empty_list(self):
"""Should return 1=0 for empty input (no results)."""
result = _build_resource_filter_sql([])
assert result == "1=0"
def test_single_id(self):
"""Should build IN clause with bind variable for single ID."""
result, params = _build_resource_filter_sql(['RES001'], return_params=True)
assert "j.RESOURCEID IN" in result
assert ":p0" in result
assert params["p0"] == "RES001"
assert "RES001" not in result
def test_multiple_ids(self):
"""Should build IN clause with multiple bind variables."""
result, params = _build_resource_filter_sql(['RES001', 'RES002'], return_params=True)
assert "j.RESOURCEID IN" in result
assert ":p0" in result
assert ":p1" in result
assert params["p0"] == "RES001"
assert params["p1"] == "RES002"
def test_custom_column(self):
"""Should use custom column name."""
result = _build_resource_filter_sql(['RES001'], column='r.ID')
assert "r.ID IN" in result
def test_large_list_uses_or(self):
"""Should use OR for chunked results."""
# Create more than BATCH_SIZE IDs
ids = [f'RES{i:05d}' for i in range(BATCH_SIZE + 10)]
result = _build_resource_filter_sql(ids)
assert " OR " in result
# Should have parentheses wrapping the OR conditions
assert result.startswith("(")
assert result.endswith(")")
result = _build_resource_filter_sql(ids)
assert " OR " in result
# Should have parentheses wrapping the OR conditions
assert result.startswith("(")
assert result.endswith(")")
def test_sql_injection_payload_stays_in_params(self):
"""Injection payload should never be interpolated into SQL text."""
payload = "RES001' OR '1'='1"
sql, params = _build_resource_filter_sql([payload], return_params=True)
assert payload in params.values()
assert payload not in sql
class TestServiceConstants:
class TestServiceConstants:
"""Tests for service constants."""
def test_batch_size_is_reasonable(self):
"""Batch size should be <= 1000 (Oracle limit)."""
assert BATCH_SIZE <= 1000
def test_max_date_range_is_year(self):
"""Max date range should be 365 days."""
assert MAX_DATE_RANGE_DAYS == 365
def test_max_date_range_is_year(self):
"""Max date range should be 365 days."""
assert MAX_DATE_RANGE_DAYS == 365
class TestErrorLeakageProtection:
"""Tests for exception detail masking in job-query service."""
@patch("mes_dashboard.services.job_query_service.read_sql_df")
def test_query_error_masks_internal_details(self, mock_read):
mock_read.side_effect = RuntimeError("ORA-00942: table or view does not exist")
result = get_jobs_by_resources(["RES001"], "2024-01-01", "2024-01-31")
assert result["error"] == QUERY_ERROR_MESSAGE
assert "ORA-00942" not in result["error"]
@patch("mes_dashboard.services.job_query_service.read_sql_df")
def test_export_stream_error_masks_internal_details(self, mock_read):
mock_read.side_effect = RuntimeError("sensitive sql context")
output = "".join(export_jobs_with_history(["RES001"], "2024-01-01", "2024-01-31"))
assert EXPORT_ERROR_MESSAGE in output
assert "sensitive sql context" not in output

View File

@@ -3,6 +3,18 @@
from __future__ import annotations
from unittest.mock import patch
import mes_dashboard.core.database as db
from mes_dashboard.app import create_app
def _client():
db._ENGINE = None
app = create_app("testing")
app.config["TESTING"] = True
return app.test_client()
def test_clean_nan_values_handles_deep_nesting_without_recursion_error():
from mes_dashboard.routes.resource_routes import _clean_nan_values
@@ -30,3 +42,33 @@ def test_clean_nan_values_breaks_cycles_safely():
cleaned = _clean_nan_values(payload)
assert cleaned["name"] == "root"
assert cleaned["self"] is None
@patch(
"mes_dashboard.routes.resource_routes.get_resource_status_summary",
side_effect=RuntimeError("ORA-00942: table or view does not exist"),
)
def test_resource_status_summary_masks_internal_error_details(_mock_summary):
response = _client().get("/api/resource/status/summary")
assert response.status_code == 500
payload = response.get_json()
assert payload["success"] is False
assert payload["error"]["code"] == "INTERNAL_ERROR"
assert payload["error"]["message"] == "服務暫時無法使用"
assert "ORA-00942" not in str(payload)
@patch(
"mes_dashboard.routes.resource_routes.get_merged_resource_status",
side_effect=RuntimeError("sensitive sql context"),
)
def test_resource_status_masks_internal_error_details(_mock_status):
response = _client().get("/api/resource/status")
assert response.status_code == 500
payload = response.get_json()
assert payload["success"] is False
assert payload["error"]["code"] == "INTERNAL_ERROR"
assert payload["error"]["message"] == "服務暫時無法使用"
assert "sensitive sql context" not in str(payload)

View File

@@ -0,0 +1,69 @@
# -*- coding: utf-8 -*-
"""Unit tests for watchdog logging helpers."""
from __future__ import annotations
import logging
from unittest.mock import patch
from mes_dashboard.core.watchdog_logging import attach_sqlite_log_handler
def _reset_logger(logger: logging.Logger) -> None:
logger.handlers.clear()
if hasattr(logger, "_watchdog_sqlite_handler_registered"):
delattr(logger, "_watchdog_sqlite_handler_registered")
def test_attach_sqlite_log_handler_enabled_attaches_once():
test_logger = logging.getLogger("mes_dashboard.watchdog.test.enabled")
_reset_logger(test_logger)
handler_one = logging.NullHandler()
handler_two = logging.NullHandler()
with patch("mes_dashboard.core.log_store.LOG_STORE_ENABLED", True), patch(
"mes_dashboard.core.log_store.get_sqlite_log_handler",
side_effect=[handler_one, handler_two],
) as handler_factory:
first = attach_sqlite_log_handler(test_logger)
second = attach_sqlite_log_handler(test_logger)
assert first is True
assert second is False
assert handler_factory.call_count == 1
assert handler_one in test_logger.handlers
assert handler_two not in test_logger.handlers
_reset_logger(test_logger)
def test_attach_sqlite_log_handler_disabled_skips_factory():
test_logger = logging.getLogger("mes_dashboard.watchdog.test.disabled")
_reset_logger(test_logger)
with patch("mes_dashboard.core.log_store.LOG_STORE_ENABLED", False), patch(
"mes_dashboard.core.log_store.get_sqlite_log_handler"
) as handler_factory:
attached = attach_sqlite_log_handler(test_logger)
assert attached is False
handler_factory.assert_not_called()
assert not test_logger.handlers
_reset_logger(test_logger)
def test_attach_sqlite_log_handler_handles_handler_errors():
test_logger = logging.getLogger("mes_dashboard.watchdog.test.error")
_reset_logger(test_logger)
with patch("mes_dashboard.core.log_store.LOG_STORE_ENABLED", True), patch(
"mes_dashboard.core.log_store.get_sqlite_log_handler",
side_effect=RuntimeError("boom"),
):
attached = attach_sqlite_log_handler(test_logger)
assert attached is False
assert not test_logger.handlers
_reset_logger(test_logger)