security: 完成安全稽核修復與測試配置更新

安全性改進:
- 新增 Session Cookie 安全設定 (SECURE, HTTPONLY, SAMESITE)
- 新增登入端點速率限制防止暴力破解攻擊 (5次/5分鐘)
- 將 dashboard_service 和 resource_service 的 print() 轉換為 logger
- 新增 CORS 環境變數配置範例

文件更新:
- README.md 新增使用者操作指南
- .gitignore 新增 Windows 特殊檔案 nul

測試修復:
- 修正壓力測試預設端口 (5000 → 8080)
- 修正壓力測試使用已發布頁面的標籤名稱
- 修正認證測試正確 mock LOCAL_AUTH_ENABLED

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
beabigegg
2026-02-04 17:25:05 +08:00
parent de0904ffa3
commit 858427352c
10 changed files with 199 additions and 31 deletions

View File

@@ -142,3 +142,11 @@ WATCHDOG_STATE_FILE=/tmp/mes_dashboard_restart_state.json
# Cooldown period between restart requests in seconds (default: 60)
WORKER_RESTART_COOLDOWN=60
# ============================================================
# CORS Configuration
# ============================================================
# Comma-separated list of allowed origins for CORS
# Example: https://example.com,https://app.example.com
# Set to * for development (not recommended for production)
CORS_ALLOWED_ORIGINS=

3
.gitignore vendored
View File

@@ -28,6 +28,7 @@ build/
# OS
.DS_Store
Thumbs.db
nul
# Logs
*.log
@@ -50,4 +51,4 @@ htmlcov/
.ipynb_checkpoints/
# Note: openspec/ is tracked (not ignored)
tmp/
tmp/

View File

@@ -193,6 +193,63 @@ sudo systemctl start mes-dashboard-watchdog
---
## 使用者操作指南
本節提供一般使用者的操作說明。
### 存取系統
1. 開啟瀏覽器,輸入系統網址(預設為 `http://localhost:8080`
2. 進入 Portal 首頁,可透過上方 Tab 切換各功能模組
### 基本操作
#### WIP 即時概況
- 顯示生產線 WIP在製品的即時統計
- 可透過下拉選單篩選特定工作中心或產品線
- 點擊統計卡片可展開查看詳細明細
- 支援匯出 Excel 報表
#### WIP 明細查詢
1. 選擇篩選條件工作中心、Package、Hold 狀態、製程站點)
2. 點擊「查詢」按鈕執行查詢
3. 查詢結果顯示於下方表格
4. 點擊「匯出 Excel」下載報表
#### 設備狀態監控
- 顯示所有設備的即時狀態PRD/SBY/UDT/SDT/EGT/NST
- 使用階層篩選功能:
- **生產設備**:僅顯示列入生產統計的設備
- **重點設備**:僅顯示標記為重點監控的設備
- **監控設備**:僅顯示需特別監控的設備
- 設備狀態每 30 秒自動更新
#### 設備歷史查詢
1. 選擇查詢日期範圍
2. 可選擇特定設備或工作中心
3. 查看歷史趨勢圖表和稼動率熱力圖
4. 支援 CSV 匯出
### 管理員登入
1. 點擊右上角「登入」按鈕
2. 輸入工號和密碼(使用 LDAP 認證)
3. 登入後可存取開發中功能頁面
4. 管理員可使用效能監控儀表板(`/admin/performance`
### 常見問題
**Q: 頁面顯示「資料載入中」很久沒反應?**
A: 請檢查網路連線,或重新整理頁面。如持續發生請通知系統管理員。
**Q: 查詢結果與預期不符?**
A: 請確認篩選條件是否正確設定。資料來源為 MES 系統,約有 30 秒延遲。
**Q: 無法匯出 Excel**
A: 請確認瀏覽器允許下載檔案,並檢查查詢結果是否有資料。
---
## 功能說明
### Portal 入口頁面

View File

@@ -75,6 +75,14 @@ def create_app(config_name: str | None = None) -> Flask:
# Session configuration
app.secret_key = os.environ.get("SECRET_KEY", "dev-secret-key-change-in-prod")
# Session cookie security settings
# SECURE: Only send cookie over HTTPS (disable for local development)
app.config['SESSION_COOKIE_SECURE'] = os.environ.get("FLASK_ENV") == "production"
# HTTPONLY: Prevent JavaScript access to session cookie (XSS protection)
app.config['SESSION_COOKIE_HTTPONLY'] = True
# SAMESITE: Prevent CSRF by restricting cross-site cookie sending
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax'
# Configure logging first
_configure_logging(app)

View File

@@ -3,21 +3,83 @@
from __future__ import annotations
import logging
import time
from collections import defaultdict
from datetime import datetime
from threading import Lock
from flask import Blueprint, flash, redirect, render_template, request, session, url_for
from mes_dashboard.services.auth_service import authenticate, is_admin
logger = logging.getLogger('mes_dashboard.auth_routes')
auth_bp = Blueprint("auth", __name__, url_prefix="/admin")
# ============================================================
# Rate Limiting for Login Endpoint
# ============================================================
# Simple in-memory rate limiter to prevent brute force attacks
# Configuration: max 5 attempts per IP per 5 minutes
_rate_limit_lock = Lock()
_login_attempts: dict = defaultdict(list) # IP -> list of timestamps
RATE_LIMIT_MAX_ATTEMPTS = 5
RATE_LIMIT_WINDOW_SECONDS = 300 # 5 minutes
def _is_rate_limited(ip: str) -> bool:
"""Check if an IP address is rate limited.
Args:
ip: Client IP address.
Returns:
True if rate limited, False otherwise.
"""
current_time = time.time()
window_start = current_time - RATE_LIMIT_WINDOW_SECONDS
with _rate_limit_lock:
# Clean up old attempts
_login_attempts[ip] = [
ts for ts in _login_attempts[ip] if ts > window_start
]
# Check if limit exceeded
if len(_login_attempts[ip]) >= RATE_LIMIT_MAX_ATTEMPTS:
return True
return False
def _record_login_attempt(ip: str) -> None:
"""Record a login attempt for rate limiting.
Args:
ip: Client IP address.
"""
with _rate_limit_lock:
_login_attempts[ip].append(time.time())
@auth_bp.route("/login", methods=["GET", "POST"])
def login():
"""Admin login page."""
error = None
if request.method == "POST":
# Rate limiting check
client_ip = request.remote_addr or "unknown"
if _is_rate_limited(client_ip):
logger.warning(f"Rate limit exceeded for IP: {client_ip}")
error = "登入嘗試過於頻繁,請稍後再試"
return render_template("login.html", error=error)
# Record this attempt
_record_login_attempt(client_ip)
username = request.form.get("username", "").strip()
password = request.form.get("password", "")

View File

@@ -5,9 +5,12 @@ Provides functions to query dashboard KPIs, workcenter cards,
resource details with job info, OU trends, and utilization heatmap.
"""
import logging
import pandas as pd
from typing import Optional, Dict, List, Any, Tuple
logger = logging.getLogger('mes_dashboard.dashboard_service')
from mes_dashboard.core.database import get_db_connection, read_sql_df
from mes_dashboard.core.utils import get_days_back, build_equipment_filter_sql
from mes_dashboard.config.constants import (
@@ -113,7 +116,7 @@ def query_dashboard_kpi(filters: Optional[Dict] = None) -> Optional[Dict]:
'run_pct': run_pct
}
except Exception as exc:
print(f"KPI query failed: {exc}")
logger.error(f"KPI query failed: {exc}")
return None
@@ -197,7 +200,7 @@ def query_workcenter_cards(filters: Optional[Dict] = None) -> Optional[List[Dict
return result
except Exception as exc:
print(f"Workcenter cards query failed: {exc}")
logger.error(f"Workcenter cards query failed: {exc}")
return None
@@ -321,7 +324,7 @@ def query_resource_detail_with_job(
return df, max_status_time
except Exception as exc:
print(f"Detail query failed: {exc}")
logger.error(f"Detail query failed: {exc}")
return None, None
@@ -403,9 +406,7 @@ def query_ou_trend(days: int = 7, filters: Optional[Dict] = None) -> Optional[Li
return result
except Exception as exc:
print(f"OU trend query failed: {exc}")
import traceback
traceback.print_exc()
logger.error(f"OU trend query failed: {exc}", exc_info=True)
return None
@@ -487,7 +488,5 @@ def query_utilization_heatmap(days: int = 7, filters: Optional[Dict] = None) ->
return result
except Exception as exc:
print(f"Utilization heatmap query failed: {exc}")
import traceback
traceback.print_exc()
logger.error(f"Utilization heatmap query failed: {exc}", exc_info=True)
return None

View File

@@ -4,9 +4,12 @@
Provides functions to query equipment status from DWH.DW_MES_RESOURCE and DWH.DW_MES_RESOURCESTATUS tables.
"""
import logging
import pandas as pd
from typing import Optional, Dict, List, Any
logger = logging.getLogger('mes_dashboard.resource_service')
from mes_dashboard.core.database import get_db_connection, read_sql_df
from mes_dashboard.core.utils import get_days_back, build_equipment_filter_sql
from mes_dashboard.config.constants import (
@@ -120,7 +123,7 @@ def query_resource_by_status(days_back: int = 30) -> Optional[pd.DataFrame]:
sql = sql.replace("{{ LATEST_STATUS_SUBQUERY }}", base_sql)
return read_sql_df(sql)
except Exception as exc:
print(f"Resource by status query failed: {exc}")
logger.error(f"Resource by status query failed: {exc}")
return None
@@ -139,7 +142,7 @@ def query_resource_by_workcenter(days_back: int = 30) -> Optional[pd.DataFrame]:
sql = sql.replace("{{ LATEST_STATUS_SUBQUERY }}", base_sql)
return read_sql_df(sql)
except Exception as exc:
print(f"Resource by workcenter query failed: {exc}")
logger.error(f"Resource by workcenter query failed: {exc}")
return None
@@ -216,7 +219,7 @@ def query_resource_detail(
return df
except Exception as exc:
print(f"Resource detail query failed: {exc}")
logger.error(f"Resource detail query failed: {exc}")
return None
@@ -243,7 +246,7 @@ def query_resource_workcenter_status_matrix(days_back: int = 30) -> Optional[pd.
sql = sql.replace("{{ LATEST_STATUS_SUBQUERY }}", base_sql)
return read_sql_df(sql)
except Exception as exc:
print(f"Resource status matrix query failed: {exc}")
logger.error(f"Resource status matrix query failed: {exc}")
return None
@@ -289,9 +292,7 @@ def query_resource_filter_options(days_back: int = 30) -> Optional[Dict]:
'assets_statuses': assets_statuses
}
except Exception as exc:
print(f"Resource filter options query failed: {exc}")
import traceback
traceback.print_exc()
logger.error(f"Resource filter options query failed: {exc}", exc_info=True)
return None

View File

@@ -20,7 +20,7 @@ from playwright.sync_api import Page, expect
def app_server() -> str:
"""Get the base URL for stress testing."""
import os
return os.environ.get('STRESS_TEST_URL', 'http://127.0.0.1:5000')
return os.environ.get('STRESS_TEST_URL', 'http://127.0.0.1:8080')
@pytest.fixture(scope="session")
@@ -265,11 +265,12 @@ class TestPageNavigationStress:
page.goto(app_server, wait_until='domcontentloaded', timeout=30000)
page.wait_for_timeout(500)
# Only use released pages that are visible without admin login
tabs = [
'.tab:has-text("WIP 即時概況")',
'.tab:has-text("機台狀態報表")',
'.tab:has-text("數據表查詢工具")',
'.tab:has-text("Excel 批次查詢")',
'.tab:has-text("設備即時概況")',
'.tab:has-text("設備歷史績效")',
'.tab:has-text("設備維修查詢")',
]
start_time = time.time()
@@ -292,12 +293,12 @@ class TestPageNavigationStress:
page.goto(app_server, wait_until='domcontentloaded', timeout=30000)
page.wait_for_timeout(500)
# Switch through all tabs
# Switch through released tabs (dev tabs hidden without admin login)
tabs = [
'WIP 即時概況',
'機台狀態報表',
'數據表查詢工具',
'Excel 批次查詢',
'設備即時概況',
'設備歷史績效',
'設備維修查詢',
]
for tab_name in tabs:

View File

@@ -69,9 +69,10 @@ class TestLoginRoute:
assert response.status_code == 200
assert "管理員登入" in response.data.decode("utf-8") or "login" in response.data.decode("utf-8").lower()
@patch('mes_dashboard.services.auth_service.LOCAL_AUTH_ENABLED', False)
@patch('mes_dashboard.services.auth_service.requests.post')
def test_login_success(self, mock_post, client):
"""Test successful login."""
"""Test successful login via LDAP."""
# Mock LDAP response
mock_response = MagicMock()
mock_response.json.return_value = {
@@ -98,9 +99,10 @@ class TestLoginRoute:
assert "admin" in sess
assert sess["admin"]["username"] == "92367"
@patch('mes_dashboard.services.auth_service.LOCAL_AUTH_ENABLED', False)
@patch('mes_dashboard.services.auth_service.requests.post')
def test_login_invalid_credentials(self, mock_post, client):
"""Test login with invalid credentials."""
"""Test login with invalid credentials via LDAP."""
mock_response = MagicMock()
mock_response.json.return_value = {"success": False}
mock_post.return_value = mock_response
@@ -114,9 +116,10 @@ class TestLoginRoute:
# Should show error message
assert "錯誤" in response.data.decode("utf-8") or "error" in response.data.decode("utf-8").lower()
@patch('mes_dashboard.services.auth_service.LOCAL_AUTH_ENABLED', False)
@patch('mes_dashboard.services.auth_service.requests.post')
def test_login_non_admin_user(self, mock_post, client):
"""Test login with non-admin user."""
"""Test login with non-admin user via LDAP."""
mock_response = MagicMock()
mock_response.json.return_value = {
"success": True,

View File

@@ -12,11 +12,12 @@ from mes_dashboard.services import auth_service
class TestAuthenticate:
"""Tests for authenticate function."""
"""Tests for authenticate function via LDAP."""
@patch('mes_dashboard.services.auth_service.LOCAL_AUTH_ENABLED', False)
@patch('mes_dashboard.services.auth_service.requests.post')
def test_authenticate_success(self, mock_post):
"""Test successful authentication."""
"""Test successful authentication via LDAP."""
mock_response = MagicMock()
mock_response.json.return_value = {
"success": True,
@@ -36,9 +37,10 @@ class TestAuthenticate:
assert result["mail"] == "test@panjit.com.tw"
mock_post.assert_called_once()
@patch('mes_dashboard.services.auth_service.LOCAL_AUTH_ENABLED', False)
@patch('mes_dashboard.services.auth_service.requests.post')
def test_authenticate_invalid_credentials(self, mock_post):
"""Test authentication with invalid credentials."""
"""Test authentication with invalid credentials via LDAP."""
mock_response = MagicMock()
mock_response.json.return_value = {"success": False}
mock_post.return_value = mock_response
@@ -47,6 +49,7 @@ class TestAuthenticate:
assert result is None
@patch('mes_dashboard.services.auth_service.LOCAL_AUTH_ENABLED', False)
@patch('mes_dashboard.services.auth_service.requests.post')
def test_authenticate_timeout(self, mock_post):
"""Test authentication timeout handling."""
@@ -57,6 +60,7 @@ class TestAuthenticate:
assert result is None
@patch('mes_dashboard.services.auth_service.LOCAL_AUTH_ENABLED', False)
@patch('mes_dashboard.services.auth_service.requests.post')
def test_authenticate_connection_error(self, mock_post):
"""Test authentication connection error handling."""
@@ -67,6 +71,7 @@ class TestAuthenticate:
assert result is None
@patch('mes_dashboard.services.auth_service.LOCAL_AUTH_ENABLED', False)
@patch('mes_dashboard.services.auth_service.requests.post')
def test_authenticate_invalid_json(self, mock_post):
"""Test authentication with invalid JSON response."""
@@ -79,6 +84,29 @@ class TestAuthenticate:
assert result is None
class TestLocalAuthenticate:
"""Tests for local authentication."""
@patch('mes_dashboard.services.auth_service.LOCAL_AUTH_ENABLED', True)
@patch('mes_dashboard.services.auth_service.LOCAL_AUTH_USERNAME', 'testuser')
@patch('mes_dashboard.services.auth_service.LOCAL_AUTH_PASSWORD', 'testpass')
def test_local_auth_success(self):
"""Test successful local authentication."""
result = auth_service.authenticate("testuser", "testpass")
assert result is not None
assert result["username"] == "testuser"
@patch('mes_dashboard.services.auth_service.LOCAL_AUTH_ENABLED', True)
@patch('mes_dashboard.services.auth_service.LOCAL_AUTH_USERNAME', 'testuser')
@patch('mes_dashboard.services.auth_service.LOCAL_AUTH_PASSWORD', 'testpass')
def test_local_auth_wrong_password(self):
"""Test local authentication with wrong password."""
result = auth_service.authenticate("testuser", "wrongpass")
assert result is None
class TestIsAdmin:
"""Tests for is_admin function."""