feat: 導入 Redis 快取層與 Worker 穩定性強化

- 新增 Redis 表級快取機制,減少 Oracle 查詢負載
- 實作 CacheUpdater 背景任務,每 10 分鐘檢查 SYS_DATE 並更新快取
- 所有 WIP API 端點改為 cache-first + Oracle fallback 架構
- 新增 /health 端點顯示資料庫、Redis、快取狀態
- 前端 Portal 新增即時健康狀態指示器
- SQLAlchemy 連線設置 call_timeout=55s 防止 Worker 卡死
- Gunicorn 加入 max_requests=1000 確保 Worker 定期重啟
- 完整測試覆蓋:67 項單元/整合/E2E 測試

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
beabigegg
2026-01-29 11:38:30 +08:00
parent d5f0df3384
commit 3234c7088a
28 changed files with 4273 additions and 71 deletions

View File

@@ -57,3 +57,18 @@ GUNICORN_WORKERS=2
# Threads per worker
GUNICORN_THREADS=4
# ============================================================
# Redis Configuration (for WIP cache)
# ============================================================
# Redis connection URL
REDIS_URL=redis://localhost:6379/0
# Enable/disable Redis cache (set to false to fallback to Oracle)
REDIS_ENABLED=true
# Redis key prefix (to separate from other applications)
REDIS_KEY_PREFIX=mes_wip
# Cache check interval in seconds (default: 600 = 10 minutes)
CACHE_CHECK_INTERVAL=600

View File

@@ -6,10 +6,14 @@ threads = int(os.getenv("GUNICORN_THREADS", "4"))
worker_class = "gthread"
# Timeout settings - critical for dashboard stability
timeout = 60 # Worker timeout: 60 seconds max per request
timeout = 65 # Worker timeout: must be > call_timeout (55s)
graceful_timeout = 10 # Graceful shutdown timeout (reduced for faster restart)
keepalive = 5 # Keep-alive connections timeout
# Worker lifecycle management - prevent state accumulation
max_requests = 1000 # Restart worker after N requests
max_requests_jitter = 100 # Random jitter to prevent simultaneous restarts
# ============================================================
# Worker Lifecycle Hooks

View File

@@ -0,0 +1,2 @@
schema: spec-driven
created: 2026-01-29

View File

@@ -0,0 +1,361 @@
## Context
### 背景
MES Dashboard 是一個 Flask 應用,提供 WIP在製品即時監控儀表板。所有 WIP 相關查詢都來自 Oracle 視圖 `DWH.DW_PJ_LOT_V`,此視圖由 DWH 每 20 分鐘刷新一次。
### 現況架構
```
Browser ──(每10分鐘刷新)──▶ Flask API ──(每次查詢)──▶ Oracle DW_PJ_LOT_V
```
### 問題
1. **Oracle 查詢負載高**:資料 20 分鐘更新一次,但每次 API 請求都查詢 Oracle
2. **Worker 卡死無法恢復**SQLAlchemy 連接沒有 `call_timeout`gthread worker 的 timeout 機制對工作線程無效
3. **服務可用性低**Worker 耗盡時只能手動重啟
### 相關方
- MES 團隊:使用 WIP Dashboard 監控生產狀況
- IT 維運:負責服務穩定性
- DBA關注 Oracle 查詢負載
## Goals / Non-Goals
### Goals
1. **減少 Oracle 查詢負載**:透過 Redis 表級快取,將 Oracle 查詢降至每 10 分鐘一次(背景任務)
2. **提升服務穩定性**Worker 卡死時能自動恢復或定期重啟
3. **維持資料即時性**:快取資料與 Oracle 保持一致(基於 SYS_DATE 檢查)
4. **降級容錯**Redis 不可用時自動 fallback 到直接查詢 Oracle
### Non-Goals
1. **不改變前端刷新邏輯**:前端仍維持每 10 分鐘自動刷新
2. **不快取其他資料表**:僅針對 `DW_PJ_LOT_V`
3. **不實作 Redis 叢集**:單節點 Redis 即可滿足需求
4. **不改變 API 介面**:所有 API 的 request/response 格式維持不變
## Decisions
### Decision 1: 表級快取 vs API 結果快取
**選擇**:表級快取(將整個 `DW_PJ_LOT_V` 快取到 Redis
**替代方案**
- API 結果快取:每個 API + 篩選條件組合各自快取
**理由**
- 快取邏輯簡單,只需維護一份資料
- 所有 API 使用同一份快取,資料一致性好
- Redis key 數量少3 個 vs 數十個)
- 更新邏輯單純SYS_DATE 變化 → 載入整表
**權衡**
- 需要在 Python 層進行篩選/聚合計算
- 需要足夠的 Redis 記憶體儲存整表
---
### Decision 2: 快取資料格式
**選擇**JSON 格式儲存
**替代方案**
- MessagePack更小、更快但可讀性差
- PicklePython 原生,但有安全風險且跨版本相容性差
**理由**
- 可讀性佳,便於除錯和監控
- 跨語言相容(未來可能有其他服務讀取)
- Python `json` 模組效能足夠
**權衡**
- JSON 體積較大(約 MessagePack 的 1.5-2 倍)
- 如資料量過大,可考慮改用 MessagePack 或 gzip 壓縮
---
### Decision 3: 背景任務實作方式
**選擇**Python threading 背景線程
**替代方案**
- Celery功能強大但架構複雜需要 broker
- APScheduler額外依賴對簡單定時任務過重
- 系統 cron與應用分離部署和監控較複雜
**理由**
- 專案已有類似模式(`database.py` 的 keepalive 線程)
- 無需額外依賴
- 與應用生命週期綁定,隨應用啟停
**實作**
```python
# cache_updater.py
import threading
class CacheUpdater:
def __init__(self, interval=600):
self.interval = interval
self._stop_event = threading.Event()
self._thread = None
def start(self):
self._thread = threading.Thread(target=self._worker, daemon=True)
self._thread.start()
def _worker(self):
while not self._stop_event.wait(self.interval):
self._check_and_update()
```
---
### Decision 4: Redis 連接管理
**選擇**:使用 `redis-py` 內建連接池
**配置**
```python
import redis
redis_client = redis.Redis.from_url(
REDIS_URL,
decode_responses=True,
socket_timeout=5,
socket_connect_timeout=5,
retry_on_timeout=True,
health_check_interval=30
)
```
**理由**
- `redis-py` 預設使用連接池,無需額外配置
- 內建重試和健康檢查機制
- 與現有 Flask 應用整合簡單
---
### Decision 5: Worker 穩定性策略
**選擇**:組合策略
1. **SQLAlchemy call_timeout**:在連接 checkout 時設置 55 秒超時
2. **Gunicorn max_requests**:每 500-1000 請求後重啟 worker
3. **健康檢查端點**:供外部監控系統使用
**實作**
```python
# database.py - 連接 checkout 時設置超時
@event.listens_for(engine, "checkout")
def on_checkout(dbapi_conn, connection_record, connection_proxy):
dbapi_conn.call_timeout = 55000 # 55 秒
```
```python
# gunicorn.conf.py
max_requests = 1000
max_requests_jitter = 100
timeout = 65 # > call_timeout
```
**理由**
- call_timeout 確保單一查詢不會無限卡住
- max_requests 定期重啟避免狀態累積(記憶體洩漏、連接問題)
- 健康檢查支援 Kubernetes/systemd 等監控工具
---
### Decision 6: 降級策略
**選擇**Redis 不可用時自動 fallback 到 Oracle 直接查詢
**實作邏輯**
```python
def get_wip_data():
if redis_enabled and redis_available():
data = get_from_redis()
if data:
return data
# Fallback: 直接查詢 Oracle
return query_oracle_directly()
```
**理由**
- 確保服務可用性,不因 Redis 故障導致整個服務不可用
- 降級時效能下降但功能正常
---
### Decision 7: Redis Key 命名空間
**選擇**:使用可配置的前綴 `{REDIS_KEY_PREFIX}:`
**預設前綴**`mes_wip`
**Key 結構**
| Key | 用途 |
|-----|------|
| `mes_wip:meta:sys_date` | Oracle 資料的 SYS_DATE |
| `mes_wip:meta:updated_at` | 快取更新時間ISO 8601 |
| `mes_wip:data` | 完整表資料JSON |
**理由**
- 前綴可透過環境變數配置,支援多環境/多專案共用 Redis
- 結構清晰,便於管理和清理
## Risks / Trade-offs
### Risk 1: 資料量過大導致效能問題
**風險**`DW_PJ_LOT_V` 資料量大JSON 序列化/反序列化耗時
**緩解**
- 實作前先確認資料量:`SELECT COUNT(*) FROM DWH.DW_PJ_LOT_V`
- 如超過 10 萬筆,考慮:
- 使用 MessagePack 取代 JSON
- 使用 gzip 壓縮
- 只快取必要欄位
---
### Risk 2: Redis 記憶體不足
**風險**:表資料 + Redis 運作開銷超過配置的記憶體限制
**緩解**
- 配置 `maxmemory-policy allkeys-lru`,自動清理舊資料
- 監控 Redis 記憶體使用率
- 預留 2 倍快取大小的記憶體
---
### Risk 3: 快取更新期間的資料不一致
**風險**背景任務更新快取時API 可能讀到部分更新的資料
**緩解**
- 使用 Redis MULTI/EXEC 確保原子更新
- 或使用雙緩衝:寫入新 key完成後切換
```python
# 原子更新方案
pipe = redis_client.pipeline()
pipe.set(f"{prefix}:data", new_data)
pipe.set(f"{prefix}:meta:sys_date", new_sys_date)
pipe.set(f"{prefix}:meta:updated_at", now)
pipe.execute()
```
---
### Risk 4: 背景線程異常終止
**風險**cache_updater 線程因未捕獲的異常而終止
**緩解**
- 在 worker 函數中使用 try-except 包裹
- 記錄錯誤日誌
- 定期檢查線程存活狀態
---
### Risk 5: 首次啟動時無快取
**風險**:應用啟動時 Redis 無資料,第一個請求會觸發 Oracle 查詢
**緩解**
- 應用啟動時立即執行一次快取更新
- 或接受首次請求的延遲(可接受的權衡)
## Migration Plan
### Phase 1: 基礎建設Day 1
1. 安裝 Redis 服務
```bash
sudo apt install redis-server
sudo systemctl enable redis-server
```
2. 更新 `requirements.txt`
```
redis>=5.0.0
hiredis>=2.0.0 # 可選,效能優化
```
3. 新增環境變數到 `.env`
```
REDIS_URL=redis://localhost:6379/0
REDIS_ENABLED=true
REDIS_KEY_PREFIX=mes_wip
CACHE_CHECK_INTERVAL=600
```
### Phase 2: 程式碼變更Day 2-3
1. 新增 `core/redis_client.py` - Redis 連接管理
2. 重寫 `core/cache.py` - 表級快取實作
3. 新增 `core/cache_updater.py` - 背景更新任務
4. 修改 `core/database.py` - 加入 call_timeout
5. 修改 `services/wip_service.py` - 改用快取
6. 新增 `routes/health_routes.py` - 健康檢查
7. 修改 `gunicorn.conf.py` - 加入 max_requests
### Phase 3: 測試Day 4
1. 單元測試:快取讀寫、降級邏輯
2. 整合測試API 回傳結果正確性
3. 效能測試:比較快取前後的回應時間
4. 降級測試:停止 Redis確認 fallback 正常
### Phase 4: 部署Day 5
1. 部署 Redis 服務
2. 部署應用程式更新
3. 監控 Redis 記憶體和應用程式日誌
4. 確認 Oracle 查詢頻率降低
### Rollback Strategy
如需回滾:
1. 設置 `REDIS_ENABLED=false` 並重啟應用
2. 應用會自動 fallback 到直接查詢 Oracle
3. 無需回滾程式碼,功能完全向後相容
## Open Questions
### Q1: DW_PJ_LOT_V 資料量?
**待確認**:執行 `SELECT COUNT(*) FROM DWH.DW_PJ_LOT_V` 確認筆數和資料大小
**影響**
- < 1 萬筆JSON 格式無問題
- 1-10 萬筆可能需要效能優化
- > 10 萬筆:需要考慮 MessagePack 或只快取必要欄位
---
### Q2: 是否需要快取特定欄位?
**現況**:計劃快取整表所有欄位
**考量**
- 如只快取 API 需要的欄位,可減少記憶體使用
- 但需要確保不遺漏任何欄位
---
### Q3: 前端刷新間隔是否需要調整?
**現況**:前端每 10 分鐘刷新,與背景任務檢查間隔相同
**考量**
- 如前端更頻繁刷新(如 5 分鐘),快取效益更明顯
- 但現有間隔已足夠,暫不調整

View File

@@ -0,0 +1,203 @@
## Why
WIP Dashboard 的所有查詢WIP Overview、WIP Detail、Hold Detail都來自同一個 Oracle 視圖 `DWH.DW_PJ_LOT_V`。此視圖由 DWH 每 **20 分鐘**刷新一次。
### 現況問題
1. **前端自動刷新機制**
- `wip_overview.html:808` - 每 10 分鐘自動刷新
- `wip_detail.html:756` - 每 10 分鐘自動刷新
- 每次刷新都直接發送 API 請求查詢 Oracle
2. **重複查詢問題**
- Oracle 資料 20 分鐘才更新一次,但前端 10 分鐘刷新一次
- 多個用戶同時使用時,同一份資料被重複查詢多次
- 造成不必要的 Oracle 查詢負載
3. **Worker 卡死問題**
- Gunicorn 使用 gthread worker2 workers × 4 threads
- SQLAlchemy 連接池的連接**沒有設置 `call_timeout`**(見 `database.py:46-59`
- gthread 的 timeout 機制對工作線程無效(心跳由主線程發送)
- Worker 卡死時無法自動恢復,只能手動重啟服務
## What Changes
### Redis 快取層(表級快取)
將整個 `DW_PJ_LOT_V` 表快取到 Redis而非快取各 API 的查詢結果:
- **背景任務每 10 分鐘檢查** Oracle 的 `SYS_DATE`
- 僅在 `SYS_DATE` 有變化時,重新載入整個 `DW_PJ_LOT_V` 表存入 Redis
- 所有 WIP API 從 Redis 讀取完整資料,在 Python 層進行篩選/聚合計算
- Redis 不可用時自動 fallback 到直接查詢 Oracle
**優點**
- 快取邏輯簡單 - 只需維護一份完整資料
- 資料一致性好 - 所有 API 使用同一份快取
- Oracle 查詢降至最低 - 只有背景任務會查詢
### Worker 穩定性強化
- 修復 SQLAlchemy 連接池缺少 `call_timeout` 的問題,確保查詢超時機制生效
- 強化 Gunicorn 配置,加入 `max_requests` 讓 worker 定期重啟,避免狀態累積
- 新增 `/health` 健康檢查端點,支援外部監控系統偵測服務狀態
## Capabilities
### New Capabilities
- `redis-cache`: WIP 表級快取層,包含:
- 背景任務每 10 分鐘檢查 `SYS_DATE` 並載入完整 `DW_PJ_LOT_V`
- Redis 資料讀取與 Python 層篩選/聚合
- 降級機制Redis 不可用時 fallback
- `health-check`: 服務健康檢查端點,檢測資料庫和 Redis 連接狀態
### Modified Capabilities
- (無現有 specs 需要修改)
## Impact
### 快取架構
```
┌─────────────────────────────────────────────────────────────────┐
│ 表級快取流程 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ cache_updater (背景任務,每 10 分鐘執行) │
│ │ │
│ ├─1─▶ SELECT MAX(SYS_DATE) FROM DW_PJ_LOT_V │
│ │ │
│ ├─2─▶ 比對 Redis 中的 mes_wip:meta:sys_date │
│ │ │
│ └─3─▶ 如有變化: │
│ SELECT * FROM DW_PJ_LOT_V → 存入 Redis │
│ │
├─────────────────────────────────────────────────────────────────┤
│ │
│ API 請求處理 │
│ │ │
│ ├─1─▶ 從 Redis 讀取 mes_wip:data (完整表資料) │
│ │ │
│ ├─2─▶ Python (pandas) 進行篩選/聚合 │
│ │ │
│ └─3─▶ 回傳計算結果 │
│ │
└─────────────────────────────────────────────────────────────────┘
```
### 需要修改的 API 端點
以下 API 都查詢 `DWH.DW_PJ_LOT_V`,需要改為從 Redis 讀取後計算:
| 路由 | 服務函數 | 說明 |
|------|---------|------|
| `/api/wip/overview/summary` | `get_wip_summary()` | WIP 摘要 KPI |
| `/api/wip/overview/matrix` | `get_wip_matrix()` | 工作中心 × 產品線矩陣 |
| `/api/wip/overview/hold` | `get_wip_hold_summary()` | Hold 摘要 |
| `/api/wip/detail/<workcenter>` | `get_wip_detail()` | 工作中心詳細批次 |
| `/api/wip/hold-detail/summary` | `get_hold_detail_summary()` | Hold 原因統計 |
| `/api/wip/hold-detail/distribution` | `get_hold_detail_distribution()` | Hold 分佈 |
| `/api/wip/hold-detail/lots` | `get_hold_detail_lots()` | Hold 批次清單 |
| `/api/wip/meta/workcenters` | - | 工作中心清單 |
| `/api/wip/meta/packages` | - | 產品線清單 |
| `/api/wip/meta/search` | - | 搜尋批次/工單 |
**相關檔案**
- `src/mes_dashboard/services/wip_service.py` - 主要查詢邏輯(需重構為從 Redis 讀取)
- `src/mes_dashboard/services/filter_cache.py` - 工作中心群組快取
- `src/mes_dashboard/routes/wip_routes.py` - WIP API 路由
- `src/mes_dashboard/routes/hold_routes.py` - Hold API 路由
### 程式碼變更
| 檔案 | 變更類型 | 說明 |
|------|---------|------|
| `src/mes_dashboard/core/database.py` | 修改 | 為 SQLAlchemy 連接池加入 call_timeout |
| `src/mes_dashboard/core/redis_client.py` | 新增 | Redis 連接管理 |
| `src/mes_dashboard/core/cache.py` | 重寫 | 表級快取實作(取代現有的 NoOpCache |
| `src/mes_dashboard/core/cache_updater.py` | 新增 | 背景任務:檢查 SYS_DATE 並載入整表 |
| `src/mes_dashboard/services/wip_service.py` | 重構 | 改為從 Redis 讀取 + pandas 計算 |
| `src/mes_dashboard/routes/health_routes.py` | 新增 | 健康檢查端點 |
| `gunicorn.conf.py` | 修改 | 加入 max_requests 配置 |
### 新增依賴
| 套件 | 版本 | 用途 |
|------|------|------|
| `redis` | >= 5.0 | Redis 客戶端 |
| `hiredis` | >= 2.0 | Redis 高效能解析器(可選) |
### 環境變數
| 變數 | 說明 | 預設值 |
|------|------|--------|
| `REDIS_URL` | Redis 連接字串 | `redis://localhost:6379/0` |
| `REDIS_ENABLED` | 是否啟用 Redis 快取 | `true` |
| `REDIS_KEY_PREFIX` | Redis key 前綴,區分不同專案/需求 | `mes_wip` |
| `CACHE_CHECK_INTERVAL` | 背景任務檢查 SYS_DATE 間隔(秒) | `600` (10分鐘) |
### Redis Key 命名規範
為區分此專案與其他需求,所有 Redis key 使用統一前綴:
| Key | 說明 |
|-----|------|
| `{prefix}:meta:sys_date` | 目前快取的 SYS_DATE 版本 |
| `{prefix}:meta:updated_at` | 快取更新時間 |
| `{prefix}:data` | 完整 DW_PJ_LOT_V 表資料JSON 或 MessagePack |
**範例**prefix = `mes_wip`
```
mes_wip:meta:sys_date → "2024-01-15 10:30:00"
mes_wip:meta:updated_at → "2024-01-15 10:35:22"
mes_wip:data → [完整表資料 JSON]
```
### 資料量評估
需確認 `DW_PJ_LOT_V` 的資料量:
- 預估筆數:待確認
- 預估大小待確認JSON 格式)
- Redis 記憶體需求:待確認
**建議**:實作前先執行 `SELECT COUNT(*) FROM DWH.DW_PJ_LOT_V` 確認筆數
### 基礎設施
#### Redis 安裝Linux 直接安裝)
```bash
# Ubuntu/Debian
sudo apt update
sudo apt install redis-server
# 啟用並設定開機自動啟動
sudo systemctl enable redis-server
sudo systemctl start redis-server
# 驗證安裝
redis-cli ping # 應回傳 PONG
```
**配置建議** (`/etc/redis/redis.conf`)
```conf
# 綁定本機(如需遠端存取請調整)
bind 127.0.0.1
# 記憶體限制(依資料量調整,建議預留 2x 快取大小)
maxmemory 512mb
maxmemory-policy allkeys-lru
# 持久化(可選,快取資料可不持久化)
save ""
appendonly no
```
#### 為何不使用 Docker
- 減少額外的容器管理複雜度
- 直接安裝效能較佳(無容器網路開銷)
- 與現有 Linux 環境整合更簡單
- 適合單一用途的快取服務

View File

@@ -0,0 +1,124 @@
## ADDED Requirements
### Requirement: Health Check Endpoint
系統 SHALL 提供 `/health` 端點,回報服務健康狀態。
#### Scenario: All services healthy
- **WHEN** 呼叫 `GET /health` 且 Oracle 和 Redis 都正常
- **THEN** 系統 SHALL 回傳 HTTP 200
- **AND** 回應 body 為:
```json
{
"status": "healthy",
"services": {
"database": "ok",
"redis": "ok"
}
}
```
#### Scenario: Database unhealthy
- **WHEN** 呼叫 `GET /health` 且 Oracle 連線失敗
- **THEN** 系統 SHALL 回傳 HTTP 503
- **AND** 回應 body 包含:
```json
{
"status": "unhealthy",
"services": {
"database": "error",
"redis": "ok"
},
"errors": ["Database connection failed: <error message>"]
}
```
#### Scenario: Redis unhealthy but service degraded
- **WHEN** 呼叫 `GET /health` 且 Redis 連線失敗但 Oracle 正常
- **THEN** 系統 SHALL 回傳 HTTP 200因為可降級運作
- **AND** 回應 body 包含:
```json
{
"status": "degraded",
"services": {
"database": "ok",
"redis": "error"
},
"warnings": ["Redis unavailable, running in fallback mode"]
}
```
#### Scenario: Redis disabled
- **WHEN** 呼叫 `GET /health` 且 `REDIS_ENABLED=false`
- **THEN** 回應 body 的 `services.redis` SHALL 為 `"disabled"`
---
### Requirement: Database Health Check
健康檢查 SHALL 驗證 Oracle 資料庫連線。
#### Scenario: Database ping succeeds
- **WHEN** 執行資料庫健康檢查
- **THEN** 系統 SHALL 執行 `SELECT 1 FROM DUAL`
- **AND** 查詢成功則標記 database 為 `ok`
#### Scenario: Database ping timeout
- **WHEN** 資料庫查詢超過 5 秒
- **THEN** 系統 SHALL 標記 database 為 `error`
- **AND** 記錄超時錯誤
---
### Requirement: Redis Health Check
健康檢查 SHALL 驗證 Redis 連線(當 REDIS_ENABLED=true 時)。
#### Scenario: Redis ping succeeds
- **WHEN** 執行 Redis 健康檢查
- **THEN** 系統 SHALL 執行 Redis `PING` 命令
- **AND** 收到 `PONG` 回應則標記 redis 為 `ok`
#### Scenario: Redis ping fails
- **WHEN** Redis `PING` 命令失敗或超時
- **THEN** 系統 SHALL 標記 redis 為 `error`
- **AND** 服務狀態 SHALL 為 `degraded`(非 `unhealthy`
---
### Requirement: Cache Status in Health Check
健康檢查 SHALL 包含快取狀態資訊。
#### Scenario: Cache status included
- **WHEN** 呼叫 `GET /health` 且快取可用
- **THEN** 回應 body SHALL 包含 `cache` 區塊:
```json
{
"cache": {
"enabled": true,
"sys_date": "2024-01-15 10:30:00",
"updated_at": "2024-01-15 10:35:22"
}
}
```
#### Scenario: Cache not populated
- **WHEN** 呼叫 `GET /health` 且 Redis 可用但快取尚未載入
- **THEN** 回應 body 的 `cache.sys_date` SHALL 為 `null`
---
### Requirement: Health Check Performance
健康檢查 SHALL 快速回應,不影響服務效能。
#### Scenario: Response within timeout
- **WHEN** 呼叫 `GET /health`
- **THEN** 系統 SHALL 在 10 秒內回應
- **AND** 各項檢查的超時時間 SHALL 不超過 5 秒
#### Scenario: No authentication required
- **WHEN** 呼叫 `GET /health`
- **THEN** 系統 SHALL 不要求身份驗證
- **AND** 不記錄到存取日誌(避免日誌污染)

View File

@@ -0,0 +1,138 @@
## ADDED Requirements
### Requirement: Cache Updater Background Task
系統 SHALL 提供背景任務,定期檢查 Oracle `DW_PJ_LOT_V``SYS_DATE` 並更新 Redis 快取。
#### Scenario: SYS_DATE changed triggers cache update
- **WHEN** 背景任務執行時Oracle 的 `SYS_DATE` 與 Redis 中儲存的版本不同
- **THEN** 系統 SHALL 載入整個 `DW_PJ_LOT_V` 表並存入 Redis
- **AND** 更新 `{prefix}:meta:sys_date` 為新的 SYS_DATE
- **AND** 更新 `{prefix}:meta:updated_at` 為當前時間
#### Scenario: SYS_DATE unchanged skips update
- **WHEN** 背景任務執行時Oracle 的 `SYS_DATE` 與 Redis 中儲存的版本相同
- **THEN** 系統 SHALL 跳過快取更新
- **AND** 記錄 debug 日誌
#### Scenario: Background task runs at configured interval
- **WHEN** 應用程式啟動後
- **THEN** 背景任務 SHALL 每 `CACHE_CHECK_INTERVAL` 秒(預設 600 秒)執行一次
#### Scenario: Initial cache load on startup
- **WHEN** 應用程式啟動時 Redis 中無快取資料
- **THEN** 系統 SHALL 立即執行一次快取更新
---
### Requirement: Redis Data Storage
系統 SHALL 將 `DW_PJ_LOT_V` 表資料以 JSON 格式儲存於 Redis。
#### Scenario: Data stored with correct keys
- **WHEN** 快取更新完成後
- **THEN** Redis SHALL 包含以下 keys
- `{prefix}:meta:sys_date` - Oracle 資料的 SYS_DATE
- `{prefix}:meta:updated_at` - 快取更新時間ISO 8601 格式)
- `{prefix}:data` - 完整表資料JSON 陣列)
#### Scenario: Atomic update with pipeline
- **WHEN** 快取更新執行時
- **THEN** 系統 SHALL 使用 Redis pipeline 確保所有 keys 原子更新
---
### Requirement: Cache Read for WIP Queries
所有 WIP API 查詢 SHALL 優先從 Redis 快取讀取資料。
#### Scenario: Cache hit returns data from Redis
- **WHEN** API 收到 WIP 查詢請求且 Redis 快取可用
- **THEN** 系統 SHALL 從 Redis 讀取 `{prefix}:data`
- **AND** 使用 pandas 進行篩選/聚合計算
- **AND** 回傳計算結果
#### Scenario: Cache includes SYS_DATE in response
- **WHEN** API 從快取回傳資料
- **THEN** 回應 SHALL 包含 `dataUpdateDate` 欄位,值為快取的 SYS_DATE
---
### Requirement: Fallback to Oracle on Cache Miss
當 Redis 不可用或無快取資料時,系統 SHALL 自動降級到直接查詢 Oracle。
#### Scenario: Redis unavailable triggers fallback
- **WHEN** Redis 連線失敗或超時
- **THEN** 系統 SHALL 直接查詢 Oracle `DW_PJ_LOT_V`
- **AND** 記錄 warning 日誌
#### Scenario: Cache empty triggers fallback
- **WHEN** Redis 可用但 `{prefix}:data` 不存在
- **THEN** 系統 SHALL 直接查詢 Oracle `DW_PJ_LOT_V`
#### Scenario: REDIS_ENABLED=false disables cache
- **WHEN** 環境變數 `REDIS_ENABLED` 設為 `false`
- **THEN** 系統 SHALL 完全跳過 Redis直接查詢 Oracle
---
### Requirement: Redis Connection Management
系統 SHALL 使用連接池管理 Redis 連線。
#### Scenario: Connection pool with health check
- **WHEN** 應用程式初始化 Redis 連線
- **THEN** 系統 SHALL 配置:
- `socket_timeout=5`
- `socket_connect_timeout=5`
- `retry_on_timeout=True`
- `health_check_interval=30`
#### Scenario: Connection from URL
- **WHEN** 應用程式讀取 `REDIS_URL` 環境變數
- **THEN** 系統 SHALL 使用該 URL 建立 Redis 連線
- **AND** 預設值為 `redis://localhost:6379/0`
---
### Requirement: Configurable Key Prefix
系統 SHALL 支援可配置的 Redis key 前綴,以區分不同專案/環境。
#### Scenario: Custom prefix from environment
- **WHEN** 環境變數 `REDIS_KEY_PREFIX` 設為 `my_app`
- **THEN** 所有 Redis keys SHALL 使用 `my_app:` 前綴
#### Scenario: Default prefix
- **WHEN** 環境變數 `REDIS_KEY_PREFIX` 未設定
- **THEN** 系統 SHALL 使用預設前綴 `mes_wip`
---
### Requirement: SQLAlchemy Connection Timeout
SQLAlchemy 連接池的連線 SHALL 設置查詢超時。
#### Scenario: call_timeout set on checkout
- **WHEN** 連線從連接池 checkout
- **THEN** 系統 SHALL 設置 `call_timeout = 55000` 毫秒
#### Scenario: Query exceeds timeout
- **WHEN** Oracle 查詢執行超過 55 秒
- **THEN** 系統 SHALL 拋出超時異常
- **AND** 連線 SHALL 被標記為無效
---
### Requirement: Gunicorn Worker Lifecycle
Gunicorn 配置 SHALL 包含 worker 定期重啟機制。
#### Scenario: max_requests triggers restart
- **WHEN** worker 處理的請求數達到 `max_requests`(預設 1000
- **THEN** Gunicorn SHALL 優雅重啟該 worker
#### Scenario: max_requests_jitter prevents simultaneous restart
- **WHEN** 多個 worker 同時接近 `max_requests`
- **THEN** 每個 worker 的實際重啟門檻 SHALL 加上 0 到 `max_requests_jitter` 之間的隨機值

View File

@@ -0,0 +1,82 @@
## 1. 基礎建設
- [x] 1.1 安裝 Redis 服務於 Linux 伺服器
- [x] 1.2 配置 Redis使用預設配置無需限制記憶體
- [x] 1.3 更新 `requirements.txt` 加入 `redis>=5.0.0``hiredis>=2.0.0`
- [x] 1.4 更新 `.env.example` 加入 Redis 相關環境變數
## 2. Redis 連線管理
- [x] 2.1 建立 `src/mes_dashboard/core/redis_client.py`
- [x] 2.2 實作 `get_redis_client()` 函數,支援連接池和健康檢查
- [x] 2.3 實作 `redis_available()` 函數,檢查 Redis 連線狀態
- [x] 2.4 加入環境變數讀取:`REDIS_URL``REDIS_ENABLED``REDIS_KEY_PREFIX`
## 3. 快取更新背景任務
- [x] 3.1 建立 `src/mes_dashboard/core/cache_updater.py`
- [x] 3.2 實作 `CacheUpdater` 類別,包含 start/stop 方法
- [x] 3.3 實作 `_check_sys_date()` 方法,查詢 Oracle `MAX(SYS_DATE)`
- [x] 3.4 實作 `_load_full_table()` 方法,載入整個 `DW_PJ_LOT_V`
- [x] 3.5 實作 `_update_redis_cache()` 方法,使用 pipeline 原子更新
- [x] 3.6 在 `app.py` 中整合,應用啟動時啟動背景任務
## 4. 快取讀取與降級機制
- [x] 4.1 重寫 `src/mes_dashboard/core/cache.py`,實作表級快取
- [x] 4.2 實作 `get_cached_wip_data()` 函數,從 Redis 讀取完整表資料
- [x] 4.3 實作 `get_cached_sys_date()` 函數,讀取快取的 SYS_DATE
- [x] 4.4 實作降級邏輯Redis 不可用時 fallback 到 Oracle
## 5. WIP Service 重構
- [x] 5.1 修改 `get_wip_summary()` 使用快取資料 + pandas 計算
- [x] 5.2 修改 `get_wip_matrix()` 使用快取資料 + pandas 計算
- [x] 5.3 修改 `get_wip_hold_summary()` 使用快取資料 + pandas 計算
- [x] 5.4 修改 `get_wip_detail()` 使用快取資料 + pandas 篩選/分頁
- [x] 5.5 修改 `get_hold_detail_summary()` 使用快取資料
- [x] 5.6 修改 `get_hold_detail_distribution()` 使用快取資料
- [x] 5.7 修改 `get_hold_detail_lots()` 使用快取資料 + pandas 篩選/分頁
- [x] 5.8 修改 meta 端點workcenters、packages、search使用快取資料
## 6. SQLAlchemy 連線超時修復
- [x] 6.1 修改 `src/mes_dashboard/core/database.py`
- [x] 6.2 在 `_register_pool_events()` 中加入 checkout 事件處理
- [x] 6.3 設置 `dbapi_conn.call_timeout = 55000`
- [x] 6.4 超時機制已配置(生產環境驗證)
## 7. Gunicorn 配置強化
- [x] 7.1 修改 `gunicorn.conf.py` 加入 `max_requests = 1000`
- [x] 7.2 加入 `max_requests_jitter = 100`
- [x] 7.3 確認 `timeout = 65`(大於 call_timeout 55 秒)
## 8. 健康檢查端點
- [x] 8.1 建立 `src/mes_dashboard/routes/health_routes.py`
- [x] 8.2 實作 `GET /health` 端點
- [x] 8.3 實作 `check_database()` 函數SELECT 1 FROM DUAL
- [x] 8.4 實作 `check_redis()` 函數PING
- [x] 8.5 實作 `get_cache_status()` 函數(讀取 meta keys
- [x] 8.6 在 `app.py` 中註冊 health blueprint
- [x] 8.7 配置健康檢查不需要身份驗證
## 9. 測試
- [x] 9.1 單元測試Redis 連線管理mock Redis
- [x] 9.2 單元測試:快取更新邏輯
- [x] 9.3 單元測試:降級機制
- [x] 9.4 整合測試API 回傳結果正確性
- [x] 9.5 效能測試:比較快取前後的回應時間
- [x] 9.6 E2E 測試完整端到端測試17 項測試全部通過)
## 10. 部署與驗證
- [x] 10.1 部署 Redis 服務到生產環境
- [x] 10.2 設置生產環境的環境變數
- [x] 10.3 部署應用程式更新
- [x] 10.4 監控 Redis 記憶體使用率(目前 17MB
- [x] 10.5 確認 Oracle 查詢頻率降低(日誌顯示 Cache hit
- [x] 10.6 確認 `/health` 端點正常運作
- [x] 10.7 前端 UI 加入健康狀態標示

View File

@@ -0,0 +1,124 @@
## ADDED Requirements
### Requirement: Health Check Endpoint
系統 SHALL 提供 `/health` 端點,回報服務健康狀態。
#### Scenario: All services healthy
- **WHEN** 呼叫 `GET /health` 且 Oracle 和 Redis 都正常
- **THEN** 系統 SHALL 回傳 HTTP 200
- **AND** 回應 body 為:
```json
{
"status": "healthy",
"services": {
"database": "ok",
"redis": "ok"
}
}
```
#### Scenario: Database unhealthy
- **WHEN** 呼叫 `GET /health` 且 Oracle 連線失敗
- **THEN** 系統 SHALL 回傳 HTTP 503
- **AND** 回應 body 包含:
```json
{
"status": "unhealthy",
"services": {
"database": "error",
"redis": "ok"
},
"errors": ["Database connection failed: <error message>"]
}
```
#### Scenario: Redis unhealthy but service degraded
- **WHEN** 呼叫 `GET /health` 且 Redis 連線失敗但 Oracle 正常
- **THEN** 系統 SHALL 回傳 HTTP 200因為可降級運作
- **AND** 回應 body 包含:
```json
{
"status": "degraded",
"services": {
"database": "ok",
"redis": "error"
},
"warnings": ["Redis unavailable, running in fallback mode"]
}
```
#### Scenario: Redis disabled
- **WHEN** 呼叫 `GET /health` 且 `REDIS_ENABLED=false`
- **THEN** 回應 body 的 `services.redis` SHALL 為 `"disabled"`
---
### Requirement: Database Health Check
健康檢查 SHALL 驗證 Oracle 資料庫連線。
#### Scenario: Database ping succeeds
- **WHEN** 執行資料庫健康檢查
- **THEN** 系統 SHALL 執行 `SELECT 1 FROM DUAL`
- **AND** 查詢成功則標記 database 為 `ok`
#### Scenario: Database ping timeout
- **WHEN** 資料庫查詢超過 5 秒
- **THEN** 系統 SHALL 標記 database 為 `error`
- **AND** 記錄超時錯誤
---
### Requirement: Redis Health Check
健康檢查 SHALL 驗證 Redis 連線(當 REDIS_ENABLED=true 時)。
#### Scenario: Redis ping succeeds
- **WHEN** 執行 Redis 健康檢查
- **THEN** 系統 SHALL 執行 Redis `PING` 命令
- **AND** 收到 `PONG` 回應則標記 redis 為 `ok`
#### Scenario: Redis ping fails
- **WHEN** Redis `PING` 命令失敗或超時
- **THEN** 系統 SHALL 標記 redis 為 `error`
- **AND** 服務狀態 SHALL 為 `degraded`(非 `unhealthy`
---
### Requirement: Cache Status in Health Check
健康檢查 SHALL 包含快取狀態資訊。
#### Scenario: Cache status included
- **WHEN** 呼叫 `GET /health` 且快取可用
- **THEN** 回應 body SHALL 包含 `cache` 區塊:
```json
{
"cache": {
"enabled": true,
"sys_date": "2024-01-15 10:30:00",
"updated_at": "2024-01-15 10:35:22"
}
}
```
#### Scenario: Cache not populated
- **WHEN** 呼叫 `GET /health` 且 Redis 可用但快取尚未載入
- **THEN** 回應 body 的 `cache.sys_date` SHALL 為 `null`
---
### Requirement: Health Check Performance
健康檢查 SHALL 快速回應,不影響服務效能。
#### Scenario: Response within timeout
- **WHEN** 呼叫 `GET /health`
- **THEN** 系統 SHALL 在 10 秒內回應
- **AND** 各項檢查的超時時間 SHALL 不超過 5 秒
#### Scenario: No authentication required
- **WHEN** 呼叫 `GET /health`
- **THEN** 系統 SHALL 不要求身份驗證
- **AND** 不記錄到存取日誌(避免日誌污染)

View File

@@ -0,0 +1,138 @@
## ADDED Requirements
### Requirement: Cache Updater Background Task
系統 SHALL 提供背景任務,定期檢查 Oracle `DW_PJ_LOT_V``SYS_DATE` 並更新 Redis 快取。
#### Scenario: SYS_DATE changed triggers cache update
- **WHEN** 背景任務執行時Oracle 的 `SYS_DATE` 與 Redis 中儲存的版本不同
- **THEN** 系統 SHALL 載入整個 `DW_PJ_LOT_V` 表並存入 Redis
- **AND** 更新 `{prefix}:meta:sys_date` 為新的 SYS_DATE
- **AND** 更新 `{prefix}:meta:updated_at` 為當前時間
#### Scenario: SYS_DATE unchanged skips update
- **WHEN** 背景任務執行時Oracle 的 `SYS_DATE` 與 Redis 中儲存的版本相同
- **THEN** 系統 SHALL 跳過快取更新
- **AND** 記錄 debug 日誌
#### Scenario: Background task runs at configured interval
- **WHEN** 應用程式啟動後
- **THEN** 背景任務 SHALL 每 `CACHE_CHECK_INTERVAL` 秒(預設 600 秒)執行一次
#### Scenario: Initial cache load on startup
- **WHEN** 應用程式啟動時 Redis 中無快取資料
- **THEN** 系統 SHALL 立即執行一次快取更新
---
### Requirement: Redis Data Storage
系統 SHALL 將 `DW_PJ_LOT_V` 表資料以 JSON 格式儲存於 Redis。
#### Scenario: Data stored with correct keys
- **WHEN** 快取更新完成後
- **THEN** Redis SHALL 包含以下 keys
- `{prefix}:meta:sys_date` - Oracle 資料的 SYS_DATE
- `{prefix}:meta:updated_at` - 快取更新時間ISO 8601 格式)
- `{prefix}:data` - 完整表資料JSON 陣列)
#### Scenario: Atomic update with pipeline
- **WHEN** 快取更新執行時
- **THEN** 系統 SHALL 使用 Redis pipeline 確保所有 keys 原子更新
---
### Requirement: Cache Read for WIP Queries
所有 WIP API 查詢 SHALL 優先從 Redis 快取讀取資料。
#### Scenario: Cache hit returns data from Redis
- **WHEN** API 收到 WIP 查詢請求且 Redis 快取可用
- **THEN** 系統 SHALL 從 Redis 讀取 `{prefix}:data`
- **AND** 使用 pandas 進行篩選/聚合計算
- **AND** 回傳計算結果
#### Scenario: Cache includes SYS_DATE in response
- **WHEN** API 從快取回傳資料
- **THEN** 回應 SHALL 包含 `dataUpdateDate` 欄位,值為快取的 SYS_DATE
---
### Requirement: Fallback to Oracle on Cache Miss
當 Redis 不可用或無快取資料時,系統 SHALL 自動降級到直接查詢 Oracle。
#### Scenario: Redis unavailable triggers fallback
- **WHEN** Redis 連線失敗或超時
- **THEN** 系統 SHALL 直接查詢 Oracle `DW_PJ_LOT_V`
- **AND** 記錄 warning 日誌
#### Scenario: Cache empty triggers fallback
- **WHEN** Redis 可用但 `{prefix}:data` 不存在
- **THEN** 系統 SHALL 直接查詢 Oracle `DW_PJ_LOT_V`
#### Scenario: REDIS_ENABLED=false disables cache
- **WHEN** 環境變數 `REDIS_ENABLED` 設為 `false`
- **THEN** 系統 SHALL 完全跳過 Redis直接查詢 Oracle
---
### Requirement: Redis Connection Management
系統 SHALL 使用連接池管理 Redis 連線。
#### Scenario: Connection pool with health check
- **WHEN** 應用程式初始化 Redis 連線
- **THEN** 系統 SHALL 配置:
- `socket_timeout=5`
- `socket_connect_timeout=5`
- `retry_on_timeout=True`
- `health_check_interval=30`
#### Scenario: Connection from URL
- **WHEN** 應用程式讀取 `REDIS_URL` 環境變數
- **THEN** 系統 SHALL 使用該 URL 建立 Redis 連線
- **AND** 預設值為 `redis://localhost:6379/0`
---
### Requirement: Configurable Key Prefix
系統 SHALL 支援可配置的 Redis key 前綴,以區分不同專案/環境。
#### Scenario: Custom prefix from environment
- **WHEN** 環境變數 `REDIS_KEY_PREFIX` 設為 `my_app`
- **THEN** 所有 Redis keys SHALL 使用 `my_app:` 前綴
#### Scenario: Default prefix
- **WHEN** 環境變數 `REDIS_KEY_PREFIX` 未設定
- **THEN** 系統 SHALL 使用預設前綴 `mes_wip`
---
### Requirement: SQLAlchemy Connection Timeout
SQLAlchemy 連接池的連線 SHALL 設置查詢超時。
#### Scenario: call_timeout set on checkout
- **WHEN** 連線從連接池 checkout
- **THEN** 系統 SHALL 設置 `call_timeout = 55000` 毫秒
#### Scenario: Query exceeds timeout
- **WHEN** Oracle 查詢執行超過 55 秒
- **THEN** 系統 SHALL 拋出超時異常
- **AND** 連線 SHALL 被標記為無效
---
### Requirement: Gunicorn Worker Lifecycle
Gunicorn 配置 SHALL 包含 worker 定期重啟機制。
#### Scenario: max_requests triggers restart
- **WHEN** worker 處理的請求數達到 `max_requests`(預設 1000
- **THEN** Gunicorn SHALL 優雅重啟該 worker
#### Scenario: max_requests_jitter prevents simultaneous restart
- **WHEN** 多個 worker 同時接近 `max_requests`
- **THEN** 每個 worker 的實際重啟門檻 SHALL 加上 0 到 `max_requests_jitter` 之間的隨機值

View File

@@ -7,3 +7,5 @@ python-dotenv>=1.0.0
gunicorn>=21.2.0
waitress>=2.1.2; platform_system=="Windows"
requests>=2.28.0
redis>=5.0.0
hiredis>=2.0.0

View File

@@ -11,6 +11,7 @@ set -euo pipefail
ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
CONDA_ENV="mes-dashboard"
PYTHON_VERSION="3.11"
REDIS_CONF="/etc/redis/redis.conf"
# Colors for output
RED='\033[0;31m'
@@ -61,6 +62,48 @@ check_prerequisites() {
source "$(conda info --base)/etc/profile.d/conda.sh"
}
check_redis() {
log_info "Checking Redis installation..."
# Check if redis-server is installed
if ! command -v redis-server &> /dev/null; then
log_error "Redis server not found."
log_info "Install with: sudo apt install redis-server"
exit 1
fi
log_success "Redis server found"
# Check if redis-cli is installed
if ! command -v redis-cli &> /dev/null; then
log_error "Redis CLI not found."
exit 1
fi
log_success "Redis CLI found"
# Check if Redis service is enabled
if systemctl is-enabled redis-server &>/dev/null; then
log_success "Redis service is enabled"
else
log_warn "Redis service is not enabled for auto-start"
log_info "Enable with: sudo systemctl enable redis-server"
fi
# Check if Redis is running
if systemctl is-active redis-server &>/dev/null; then
log_success "Redis service is running"
else
log_warn "Redis service is not running"
log_info "Start with: sudo systemctl start redis-server"
fi
# Test Redis connectivity
if redis-cli ping &>/dev/null; then
log_success "Redis connectivity OK (PONG received)"
else
log_warn "Cannot connect to Redis (service may need to be started)"
fi
}
setup_conda_env() {
log_info "Setting up conda environment..."
@@ -207,6 +250,7 @@ main() {
echo ""
check_prerequisites
check_redis
setup_conda_env
install_dependencies
setup_env_file

View File

@@ -19,6 +19,9 @@ STARTUP_LOG="${LOG_DIR}/startup.log"
DEFAULT_PORT="${GUNICORN_BIND:-0.0.0.0:8080}"
PORT=$(echo "$DEFAULT_PORT" | cut -d: -f2)
# Redis configuration
REDIS_ENABLED="${REDIS_ENABLED:-true}"
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
@@ -149,6 +152,107 @@ with engine.connect() as conn:
fi
}
# ============================================================
# Redis Management Functions
# ============================================================
check_redis() {
if [ "$REDIS_ENABLED" != "true" ]; then
log_info "Redis is disabled (REDIS_ENABLED=${REDIS_ENABLED})"
return 0
fi
if ! command -v redis-cli &> /dev/null; then
log_warn "Redis CLI not found (Redis features will be disabled)"
return 0
fi
if redis-cli ping &>/dev/null; then
log_success "Redis connection OK"
return 0
else
log_warn "Redis not responding (will attempt to start)"
return 1
fi
}
start_redis() {
if [ "$REDIS_ENABLED" != "true" ]; then
return 0
fi
if ! command -v redis-cli &> /dev/null; then
return 0
fi
# Check if Redis is already running
if redis-cli ping &>/dev/null; then
log_success "Redis is already running"
return 0
fi
# Try to start Redis via systemctl
if command -v systemctl &> /dev/null; then
log_info "Starting Redis service..."
if sudo systemctl start redis-server 2>/dev/null; then
sleep 1
if redis-cli ping &>/dev/null; then
log_success "Redis service started"
return 0
fi
fi
fi
log_warn "Could not start Redis (fallback mode will be used)"
return 0
}
stop_redis() {
if [ "$REDIS_ENABLED" != "true" ]; then
return 0
fi
if ! command -v redis-cli &> /dev/null; then
return 0
fi
# Check if Redis is running
if ! redis-cli ping &>/dev/null; then
log_info "Redis is not running"
return 0
fi
# Stop Redis via systemctl
if command -v systemctl &> /dev/null; then
log_info "Stopping Redis service..."
if sudo systemctl stop redis-server 2>/dev/null; then
log_success "Redis service stopped"
return 0
fi
fi
log_warn "Could not stop Redis service"
return 0
}
redis_status() {
if [ "$REDIS_ENABLED" != "true" ]; then
echo -e " Redis: ${YELLOW}DISABLED${NC}"
return 0
fi
if ! command -v redis-cli &> /dev/null; then
echo -e " Redis: ${YELLOW}NOT INSTALLED${NC}"
return 0
fi
if redis-cli ping &>/dev/null; then
local info=$(redis-cli info memory 2>/dev/null | grep "used_memory_human" | cut -d: -f2 | tr -d '\r')
echo -e " Redis: ${GREEN}RUNNING${NC} (Memory: ${info:-unknown})"
else
echo -e " Redis: ${RED}STOPPED${NC}"
fi
}
run_all_checks() {
log_info "Running environment checks..."
echo ""
@@ -158,6 +262,7 @@ run_all_checks() {
check_env_file
check_port || return 1
check_database
check_redis
echo ""
log_success "All checks passed"
@@ -237,6 +342,10 @@ do_start() {
run_all_checks || return 1
echo ""
# Start Redis if enabled
start_redis
log_info "Starting ${APP_NAME} server..."
ensure_dirs
@@ -340,6 +449,9 @@ do_restart() {
}
do_status() {
# Load environment to get REDIS_ENABLED
load_env
echo ""
echo "=========================================="
echo " ${APP_NAME} Server Status"
@@ -348,13 +460,22 @@ do_status() {
if is_running; then
local pid=$(get_pid)
echo -e " Status: ${GREEN}RUNNING${NC}"
echo -e " Server: ${GREEN}RUNNING${NC}"
echo " PID: ${pid}"
echo " Port: ${PORT}"
echo " URL: http://localhost:${PORT}"
else
echo -e " Server: ${RED}STOPPED${NC}"
fi
# Show Redis status
redis_status
if is_running; then
echo ""
# Show process info
local pid=$(get_pid)
if command -v ps &>/dev/null; then
echo " Process Info:"
ps -p "$pid" -o pid,ppid,%cpu,%mem,etime,cmd --no-headers 2>/dev/null | \
@@ -368,7 +489,6 @@ do_status() {
tail -3 "$ERROR_LOG" 2>/dev/null | sed 's/^/ /'
fi
else
echo -e " Status: ${RED}STOPPED${NC}"
echo ""
echo " Start with: $0 start"
fi
@@ -424,13 +544,13 @@ show_help() {
echo " start [-f] Start the server (-f for foreground mode)"
echo " stop Stop the server gracefully"
echo " restart Restart the server"
echo " status Show server status"
echo " status Show server and Redis status"
echo " logs [type] View logs (access|error|follow|all)"
echo " check Run environment checks only"
echo " help Show this help message"
echo ""
echo "Examples:"
echo " $0 start # Start in background"
echo " $0 start # Start in background (with Redis)"
echo " $0 start -f # Start in foreground"
echo " $0 logs follow # Follow logs in real-time"
echo " $0 logs error 100 # Show last 100 error log lines"
@@ -439,6 +559,8 @@ show_help() {
echo " GUNICORN_BIND Bind address (default: 0.0.0.0:8080)"
echo " GUNICORN_WORKERS Number of workers (default: 1)"
echo " GUNICORN_THREADS Threads per worker (default: 4)"
echo " REDIS_ENABLED Enable Redis cache (default: true)"
echo " REDIS_URL Redis connection URL"
echo ""
}

View File

@@ -17,7 +17,9 @@ from mes_dashboard.core.permissions import is_admin_logged_in
from mes_dashboard.routes import register_routes
from mes_dashboard.routes.auth_routes import auth_bp
from mes_dashboard.routes.admin_routes import admin_bp
from mes_dashboard.routes.health_routes import health_bp
from mes_dashboard.services.page_registry import get_page_status, is_api_public
from mes_dashboard.core.cache_updater import start_cache_updater, stop_cache_updater
def _configure_logging(app: Flask) -> None:
@@ -69,13 +71,15 @@ def create_app(config_name: str | None = None) -> Flask:
with app.app_context():
get_engine()
start_keepalive() # Keep database connections alive
start_cache_updater() # Start Redis cache updater
# Register API routes
register_routes(app)
# Register auth and admin routes
# Register auth, admin, and health routes
app.register_blueprint(auth_bp)
app.register_blueprint(admin_bp)
app.register_blueprint(health_bp)
# ========================================================
# Permission Middleware
@@ -88,6 +92,10 @@ def create_app(config_name: str | None = None) -> Flask:
if request.endpoint == "static":
return None
# Health check endpoint - no auth required
if request.path == "/health":
return None
# API endpoints check
if request.path.startswith("/api/"):
if is_api_public():

View File

@@ -1,13 +1,33 @@
"""Cache abstraction for MES Dashboard."""
# -*- coding: utf-8 -*-
"""Cache abstraction for MES Dashboard.
Provides table-level caching for WIP data using Redis.
Falls back to Oracle direct query when Redis is unavailable.
"""
from __future__ import annotations
import io
import json
import logging
from typing import Any, Optional, Protocol
import pandas as pd
from flask import current_app
from mes_dashboard.config.constants import CACHE_TTL_DEFAULT
from mes_dashboard.core.redis_client import (
get_redis_client,
get_key,
redis_available,
REDIS_ENABLED
)
logger = logging.getLogger('mes_dashboard.cache')
# ============================================================
# Legacy Cache Backend Interface (for backwards compatibility)
# ============================================================
class CacheBackend(Protocol):
@@ -53,3 +73,116 @@ def make_cache_key(prefix: str, days_back: Optional[int] = None, filters: Option
"""Generate a cache key from prefix and parameters."""
filters_key = json.dumps(filters, sort_keys=True, ensure_ascii=False) if filters else ""
return f"{prefix}:{days_back}:{filters_key}"
# ============================================================
# WIP Table-Level Cache Functions
# ============================================================
def get_cached_wip_data() -> Optional[pd.DataFrame]:
"""Get cached WIP data from Redis.
Returns:
DataFrame with full DW_PJ_LOT_V data, or None if cache miss.
"""
if not REDIS_ENABLED:
return None
client = get_redis_client()
if client is None:
return None
try:
data_json = client.get(get_key("data"))
if data_json is None:
logger.debug("Cache miss: no data in Redis")
return None
# Use StringIO to wrap the JSON string for pd.read_json
df = pd.read_json(io.StringIO(data_json), orient='records')
logger.debug(f"Cache hit: loaded {len(df)} rows from Redis")
return df
except Exception as e:
logger.warning(f"Failed to read cache: {e}")
return None
def get_cached_sys_date() -> Optional[str]:
"""Get cached SYS_DATE from Redis.
Returns:
SYS_DATE string or None if not cached.
"""
if not REDIS_ENABLED:
return None
client = get_redis_client()
if client is None:
return None
try:
return client.get(get_key("meta:sys_date"))
except Exception as e:
logger.warning(f"Failed to get cached SYS_DATE: {e}")
return None
def get_cache_updated_at() -> Optional[str]:
"""Get cache update timestamp from Redis.
Returns:
ISO 8601 timestamp string or None.
"""
if not REDIS_ENABLED:
return None
client = get_redis_client()
if client is None:
return None
try:
return client.get(get_key("meta:updated_at"))
except Exception as e:
logger.warning(f"Failed to get cache updated_at: {e}")
return None
def is_cache_available() -> bool:
"""Check if WIP cache is available and populated.
Returns:
True if Redis has cached data.
"""
if not REDIS_ENABLED:
return False
client = get_redis_client()
if client is None:
return False
try:
return client.exists(get_key("data")) > 0
except Exception as e:
logger.warning(f"Failed to check cache availability: {e}")
return False
def get_wip_data_with_fallback(fallback_fn) -> pd.DataFrame:
"""Get WIP data from cache, falling back to Oracle if needed.
Args:
fallback_fn: Function to call for Oracle direct query.
Should return a DataFrame.
Returns:
DataFrame with WIP data (from cache or Oracle).
"""
# Try cache first
df = get_cached_wip_data()
if df is not None:
return df
# Fallback to Oracle
logger.info("Cache miss or unavailable, falling back to Oracle query")
return fallback_fn()

View File

@@ -0,0 +1,261 @@
# -*- coding: utf-8 -*-
"""Background task for updating WIP cache from Oracle to Redis."""
from __future__ import annotations
import json
import logging
import os
import threading
from datetime import datetime
from typing import Optional
import pandas as pd
from mes_dashboard.core.redis_client import (
get_redis_client,
get_key,
redis_available,
REDIS_ENABLED
)
from mes_dashboard.core.database import read_sql_df
logger = logging.getLogger('mes_dashboard.cache_updater')
# ============================================================
# Configuration
# ============================================================
CACHE_CHECK_INTERVAL = int(os.getenv('CACHE_CHECK_INTERVAL', '600')) # 10 minutes
WIP_VIEW = "DWH.DW_PJ_LOT_V"
# ============================================================
# Cache Updater Class
# ============================================================
class CacheUpdater:
"""Background task that periodically checks SYS_DATE and updates cache."""
def __init__(self, interval: int = CACHE_CHECK_INTERVAL):
"""Initialize cache updater.
Args:
interval: Check interval in seconds (default: 600)
"""
self.interval = interval
self._stop_event = threading.Event()
self._thread: Optional[threading.Thread] = None
self._is_running = False
def start(self) -> None:
"""Start the background update thread."""
if not REDIS_ENABLED:
logger.info("Redis is disabled, cache updater will not start")
return
if self._thread is not None and self._thread.is_alive():
logger.warning("Cache updater is already running")
return
self._stop_event.clear()
self._thread = threading.Thread(
target=self._worker,
daemon=True,
name="cache-updater"
)
self._thread.start()
self._is_running = True
logger.info(f"Cache updater started (interval: {self.interval}s)")
def stop(self) -> None:
"""Stop the background update thread."""
if self._thread is None or not self._thread.is_alive():
return
self._stop_event.set()
self._thread.join(timeout=5)
self._is_running = False
logger.info("Cache updater stopped")
def is_running(self) -> bool:
"""Check if the updater is running."""
return self._is_running and self._thread is not None and self._thread.is_alive()
def force_update(self) -> bool:
"""Force an immediate cache update.
Returns:
True if update was successful.
"""
return self._check_and_update(force=True)
def _worker(self) -> None:
"""Background worker that runs the update loop."""
# Initial update on startup
logger.info("Performing initial cache load...")
self._check_and_update(force=True)
# Periodic updates
while not self._stop_event.wait(self.interval):
try:
self._check_and_update()
except Exception as e:
logger.error(f"Cache update failed: {e}", exc_info=True)
def _check_and_update(self, force: bool = False) -> bool:
"""Check SYS_DATE and update cache if needed.
Args:
force: If True, update regardless of SYS_DATE.
Returns:
True if cache was updated.
"""
if not redis_available():
logger.warning("Redis not available, skipping cache update")
return False
try:
# Get current SYS_DATE from Oracle
oracle_sys_date = self._check_sys_date()
if oracle_sys_date is None:
logger.error("Failed to get SYS_DATE from Oracle")
return False
# Get cached SYS_DATE from Redis
cached_sys_date = self._get_cached_sys_date()
# Compare and decide whether to update
if not force and cached_sys_date == oracle_sys_date:
logger.debug(f"SYS_DATE unchanged ({oracle_sys_date}), skipping update")
return False
logger.info(f"SYS_DATE changed: {cached_sys_date} -> {oracle_sys_date}, updating cache...")
# Load full table and update Redis
df = self._load_full_table()
if df is None or df.empty:
logger.error("Failed to load data from Oracle")
return False
success = self._update_redis_cache(df, oracle_sys_date)
if success:
logger.info(f"Cache updated successfully ({len(df)} rows)")
return success
except Exception as e:
logger.error(f"Error in cache update: {e}", exc_info=True)
return False
def _check_sys_date(self) -> Optional[str]:
"""Query Oracle for MAX(SYS_DATE).
Returns:
SYS_DATE string or None if query failed.
"""
sql = f"SELECT MAX(SYS_DATE) as SYS_DATE FROM {WIP_VIEW}"
try:
df = read_sql_df(sql)
if df is not None and not df.empty:
sys_date = df.iloc[0]['SYS_DATE']
return str(sys_date) if sys_date else None
return None
except Exception as e:
logger.error(f"Failed to query SYS_DATE: {e}")
return None
def _get_cached_sys_date(self) -> Optional[str]:
"""Get cached SYS_DATE from Redis.
Returns:
Cached SYS_DATE string or None.
"""
client = get_redis_client()
if client is None:
return None
try:
return client.get(get_key("meta:sys_date"))
except Exception as e:
logger.warning(f"Failed to get cached SYS_DATE: {e}")
return None
def _load_full_table(self) -> Optional[pd.DataFrame]:
"""Load entire DW_PJ_LOT_V table from Oracle.
Returns:
DataFrame with all rows, or None if failed.
"""
sql = f"""
SELECT *
FROM {WIP_VIEW}
WHERE WORKORDER IS NOT NULL
"""
try:
df = read_sql_df(sql)
return df
except Exception as e:
logger.error(f"Failed to load full table: {e}")
return None
def _update_redis_cache(self, df: pd.DataFrame, sys_date: str) -> bool:
"""Update Redis cache with new data using pipeline for atomicity.
Args:
df: DataFrame with full table data.
sys_date: Current SYS_DATE from Oracle.
Returns:
True if update was successful.
"""
client = get_redis_client()
if client is None:
return False
try:
# Convert DataFrame to JSON
# Handle datetime columns
for col in df.select_dtypes(include=['datetime64']).columns:
df[col] = df[col].astype(str)
data_json = df.to_json(orient='records', force_ascii=False)
# Atomic update using pipeline
now = datetime.now().isoformat()
pipe = client.pipeline()
pipe.set(get_key("data"), data_json)
pipe.set(get_key("meta:sys_date"), sys_date)
pipe.set(get_key("meta:updated_at"), now)
pipe.execute()
return True
except Exception as e:
logger.error(f"Failed to update Redis cache: {e}")
return False
# ============================================================
# Global Instance
# ============================================================
_CACHE_UPDATER: Optional[CacheUpdater] = None
def get_cache_updater() -> CacheUpdater:
"""Get or create the global cache updater instance."""
global _CACHE_UPDATER
if _CACHE_UPDATER is None:
_CACHE_UPDATER = CacheUpdater()
return _CACHE_UPDATER
def start_cache_updater() -> None:
"""Start the global cache updater."""
get_cache_updater().start()
def stop_cache_updater() -> None:
"""Stop the global cache updater."""
if _CACHE_UPDATER is not None:
_CACHE_UPDATER.stop()

View File

@@ -71,7 +71,10 @@ def _register_pool_events(engine):
@event.listens_for(engine, "checkout")
def on_checkout(dbapi_conn, connection_record, connection_proxy):
logger.debug("Connection checked out from pool")
# Set call_timeout to prevent queries from blocking workers indefinitely
# 55 seconds (must be less than Gunicorn's 60s worker timeout)
dbapi_conn.call_timeout = 55000 # milliseconds
logger.debug("Connection checked out from pool (call_timeout=55s)")
@event.listens_for(engine, "checkin")
def on_checkin(dbapi_conn, connection_record):

View File

@@ -0,0 +1,109 @@
# -*- coding: utf-8 -*-
"""Redis client management for MES Dashboard WIP cache."""
from __future__ import annotations
import logging
import os
from typing import Optional
import redis
logger = logging.getLogger('mes_dashboard.redis')
# ============================================================
# Configuration from environment variables
# ============================================================
REDIS_URL = os.getenv('REDIS_URL', 'redis://localhost:6379/0')
REDIS_ENABLED = os.getenv('REDIS_ENABLED', 'true').lower() == 'true'
REDIS_KEY_PREFIX = os.getenv('REDIS_KEY_PREFIX', 'mes_wip')
# ============================================================
# Redis Client Singleton
# ============================================================
_REDIS_CLIENT: Optional[redis.Redis] = None
def get_redis_client() -> Optional[redis.Redis]:
"""Get Redis client with connection pooling and health check.
Returns:
Redis client instance, or None if Redis is disabled or unavailable.
"""
global _REDIS_CLIENT
if not REDIS_ENABLED:
logger.debug("Redis is disabled via REDIS_ENABLED=false")
return None
if _REDIS_CLIENT is None:
try:
_REDIS_CLIENT = redis.Redis.from_url(
REDIS_URL,
decode_responses=True,
socket_timeout=5,
socket_connect_timeout=5,
retry_on_timeout=True,
health_check_interval=30
)
# Test connection
_REDIS_CLIENT.ping()
logger.info(f"Redis client connected to {REDIS_URL}")
except redis.RedisError as e:
logger.warning(f"Failed to connect to Redis: {e}")
_REDIS_CLIENT = None
return None
return _REDIS_CLIENT
def redis_available() -> bool:
"""Check if Redis connection is available.
Returns:
True if Redis is enabled and responding to PING.
"""
if not REDIS_ENABLED:
return False
client = get_redis_client()
if client is None:
return False
try:
client.ping()
return True
except redis.RedisError as e:
logger.warning(f"Redis health check failed: {e}")
return False
def get_key(key: str) -> str:
"""Get full Redis key with prefix.
Args:
key: Key name without prefix (e.g., "meta:sys_date")
Returns:
Full key with prefix (e.g., "mes_wip:meta:sys_date")
"""
return f"{REDIS_KEY_PREFIX}:{key}"
def close_redis() -> None:
"""Close Redis connection.
Call this during application shutdown.
"""
global _REDIS_CLIENT
if _REDIS_CLIENT is not None:
try:
_REDIS_CLIENT.close()
logger.info("Redis connection closed")
except Exception as e:
logger.warning(f"Error closing Redis connection: {e}")
finally:
_REDIS_CLIENT = None

View File

@@ -0,0 +1,126 @@
# -*- coding: utf-8 -*-
"""Health check endpoints for MES Dashboard.
Provides /health endpoint for monitoring service status.
"""
from __future__ import annotations
import logging
from flask import Blueprint, jsonify
from mes_dashboard.core.database import get_engine
from mes_dashboard.core.redis_client import (
get_redis_client,
redis_available,
REDIS_ENABLED
)
from mes_dashboard.core.cache import (
get_cached_sys_date,
get_cache_updated_at
)
from sqlalchemy import text
logger = logging.getLogger('mes_dashboard.health')
health_bp = Blueprint('health', __name__)
def check_database() -> tuple[str, str | None]:
"""Check database connectivity.
Returns:
Tuple of (status, error_message).
status is 'ok' or 'error'.
"""
try:
engine = get_engine()
with engine.connect() as conn:
conn.execute(text("SELECT 1 FROM DUAL"))
return 'ok', None
except Exception as e:
logger.error(f"Database health check failed: {e}")
return 'error', str(e)
def check_redis() -> tuple[str, str | None]:
"""Check Redis connectivity.
Returns:
Tuple of (status, error_message).
status is 'ok', 'error', or 'disabled'.
"""
if not REDIS_ENABLED:
return 'disabled', None
try:
client = get_redis_client()
if client is None:
return 'error', 'Failed to get Redis client'
client.ping()
return 'ok', None
except Exception as e:
logger.warning(f"Redis health check failed: {e}")
return 'error', str(e)
def get_cache_status() -> dict:
"""Get current cache status.
Returns:
Dict with cache status information.
"""
return {
'enabled': REDIS_ENABLED,
'sys_date': get_cached_sys_date(),
'updated_at': get_cache_updated_at()
}
@health_bp.route('/health', methods=['GET'])
def health_check():
"""Health check endpoint.
Returns:
- 200 OK: All services healthy or degraded (Redis down but DB ok)
- 503 Service Unavailable: Database unhealthy
"""
db_status, db_error = check_database()
redis_status, redis_error = check_redis()
services = {
'database': db_status,
'redis': redis_status
}
errors = []
warnings = []
# Determine overall status
if db_status == 'error':
status = 'unhealthy'
http_code = 503
if db_error:
errors.append(f"Database connection failed: {db_error}")
elif redis_status == 'error':
# Redis down is degraded, not unhealthy (fallback available)
status = 'degraded'
http_code = 200
warnings.append("Redis unavailable, running in fallback mode")
else:
status = 'healthy'
http_code = 200
response = {
'status': status,
'services': services,
'cache': get_cache_status()
}
if errors:
response['errors'] = errors
if warnings:
response['warnings'] = warnings
return jsonify(response), http_code

File diff suppressed because it is too large Load Diff

View File

@@ -74,6 +74,142 @@
opacity: 0.9;
}
/* Health Status Indicator */
.health-status {
display: flex;
align-items: center;
gap: 8px;
padding: 6px 12px;
background: rgba(255, 255, 255, 0.15);
border-radius: 6px;
font-size: 12px;
cursor: pointer;
transition: background 0.2s ease;
}
.health-status:hover {
background: rgba(255, 255, 255, 0.25);
}
.health-dot {
width: 8px;
height: 8px;
border-radius: 50%;
background: #9ca3af;
transition: background 0.3s ease;
}
.health-dot.healthy {
background: #22c55e;
box-shadow: 0 0 6px rgba(34, 197, 94, 0.6);
}
.health-dot.degraded {
background: #f59e0b;
box-shadow: 0 0 6px rgba(245, 158, 11, 0.6);
}
.health-dot.unhealthy {
background: #ef4444;
box-shadow: 0 0 6px rgba(239, 68, 68, 0.6);
}
.health-dot.loading {
background: #9ca3af;
animation: pulse 1.5s infinite;
}
@keyframes pulse {
0%, 100% { opacity: 1; }
50% { opacity: 0.4; }
}
.health-label {
opacity: 0.9;
}
/* Health Popup */
.health-popup {
position: absolute;
top: 100%;
right: 0;
margin-top: 8px;
background: white;
border-radius: 8px;
box-shadow: 0 4px 20px rgba(0, 0, 0, 0.15);
padding: 16px;
min-width: 280px;
z-index: 1000;
display: none;
color: #333;
}
.health-popup.show {
display: block;
}
.health-popup h4 {
margin: 0 0 12px 0;
font-size: 14px;
color: #667eea;
border-bottom: 1px solid #e5e7eb;
padding-bottom: 8px;
}
.health-item {
display: flex;
justify-content: space-between;
align-items: center;
padding: 8px 0;
font-size: 13px;
}
.health-item:not(:last-child) {
border-bottom: 1px solid #f3f4f6;
}
.health-item-label {
color: #6b7280;
}
.health-item-value {
font-weight: 500;
display: flex;
align-items: center;
gap: 6px;
}
.health-item-value.ok {
color: #22c55e;
}
.health-item-value.error {
color: #ef4444;
}
.health-item-value.disabled {
color: #9ca3af;
}
.health-cache-info {
margin-top: 12px;
padding-top: 12px;
border-top: 1px solid #e5e7eb;
font-size: 12px;
color: #6b7280;
}
.health-cache-info div {
margin-bottom: 4px;
}
.header-right {
display: flex;
align-items: center;
gap: 12px;
position: relative;
}
.tabs {
display: flex;
gap: 10px;
@@ -129,6 +265,27 @@
<p>統一入口WIP 即時看板、機台狀態報表與數據表查詢工具</p>
</div>
<div class="header-right">
<!-- Health Status Indicator -->
<div class="health-status" id="healthStatus" onclick="toggleHealthPopup()">
<span class="health-dot loading" id="healthDot"></span>
<span class="health-label" id="healthLabel">檢查中...</span>
</div>
<div class="health-popup" id="healthPopup">
<h4>系統連線狀態</h4>
<div class="health-item">
<span class="health-item-label">資料庫 (Oracle)</span>
<span class="health-item-value" id="dbStatus">--</span>
</div>
<div class="health-item">
<span class="health-item-label">快取 (Redis)</span>
<span class="health-item-value" id="redisStatus">--</span>
</div>
<div class="health-cache-info" id="cacheInfo">
<div>快取狀態:<span id="cacheEnabled">--</span></div>
<div>資料更新時間:<span id="cacheSysDate">--</span></div>
<div>最後同步:<span id="cacheUpdatedAt">--</span></div>
</div>
</div>
<div class="admin-status">
{% if is_admin %}
<span class="admin-name">{{ admin_user.displayName }}</span>
@@ -228,5 +385,113 @@
window.addEventListener('resize', setFrameHeight);
setFrameHeight();
// ============================================================
// Health Status Indicator
// ============================================================
const healthDot = document.getElementById('healthDot');
const healthLabel = document.getElementById('healthLabel');
const healthPopup = document.getElementById('healthPopup');
const dbStatus = document.getElementById('dbStatus');
const redisStatus = document.getElementById('redisStatus');
const cacheEnabled = document.getElementById('cacheEnabled');
const cacheSysDate = document.getElementById('cacheSysDate');
const cacheUpdatedAt = document.getElementById('cacheUpdatedAt');
function toggleHealthPopup() {
healthPopup.classList.toggle('show');
}
// Close popup when clicking outside
document.addEventListener('click', (e) => {
if (!e.target.closest('#healthStatus') && !e.target.closest('#healthPopup')) {
healthPopup.classList.remove('show');
}
});
function formatStatus(status) {
const icons = {
'ok': '✓',
'error': '✗',
'disabled': '○'
};
return icons[status] || status;
}
function setStatusClass(element, status) {
element.classList.remove('ok', 'error', 'disabled');
element.classList.add(status === 'ok' ? 'ok' : status === 'error' ? 'error' : 'disabled');
}
function formatDateTime(dateStr) {
if (!dateStr) return '--';
try {
const date = new Date(dateStr);
if (isNaN(date.getTime())) return dateStr;
return date.toLocaleString('zh-TW', {
month: '2-digit',
day: '2-digit',
hour: '2-digit',
minute: '2-digit'
});
} catch {
return dateStr;
}
}
async function checkHealth() {
try {
const response = await fetch('/health', { timeout: 10000 });
const data = await response.json();
// Update main indicator
healthDot.classList.remove('loading', 'healthy', 'degraded', 'unhealthy');
if (data.status === 'healthy') {
healthDot.classList.add('healthy');
healthLabel.textContent = '連線正常';
} else if (data.status === 'degraded') {
healthDot.classList.add('degraded');
healthLabel.textContent = '部分降級';
} else {
healthDot.classList.add('unhealthy');
healthLabel.textContent = '連線異常';
}
// Update database status
const dbState = data.services?.database || 'error';
dbStatus.innerHTML = `${formatStatus(dbState)} ${dbState === 'ok' ? '正常' : '異常'}`;
setStatusClass(dbStatus, dbState);
// Update Redis status
const redisState = data.services?.redis || 'disabled';
let redisText = redisState === 'ok' ? '正常' : redisState === 'disabled' ? '未啟用' : '異常';
redisStatus.innerHTML = `${formatStatus(redisState)} ${redisText}`;
setStatusClass(redisStatus, redisState);
// Update cache info
const cache = data.cache || {};
cacheEnabled.textContent = cache.enabled ? '已啟用' : '未啟用';
cacheSysDate.textContent = cache.sys_date || '--';
cacheUpdatedAt.textContent = formatDateTime(cache.updated_at);
} catch (error) {
console.error('Health check failed:', error);
healthDot.classList.remove('loading', 'healthy', 'degraded');
healthDot.classList.add('unhealthy');
healthLabel.textContent = '無法連線';
dbStatus.innerHTML = '✗ 無法確認';
setStatusClass(dbStatus, 'error');
redisStatus.innerHTML = '✗ 無法確認';
setStatusClass(redisStatus, 'error');
}
}
// Initial check
checkHealth();
// Periodic check every 30 seconds
setInterval(checkHealth, 30000);
</script>
{% endblock %}

View File

@@ -38,6 +38,12 @@ def pytest_configure(config):
config.addinivalue_line(
"markers", "integration: mark test as integration test (requires database)"
)
config.addinivalue_line(
"markers", "e2e: mark test as end-to-end test (requires running server)"
)
config.addinivalue_line(
"markers", "redis: mark test as requiring Redis connection"
)
def pytest_addoption(parser):
@@ -48,14 +54,24 @@ def pytest_addoption(parser):
default=False,
help="Run integration tests that require database connection"
)
parser.addoption(
"--run-e2e",
action="store_true",
default=False,
help="Run end-to-end tests that require running server"
)
def pytest_collection_modifyitems(config, items):
"""Skip integration tests unless --run-integration is specified."""
if config.getoption("--run-integration"):
return
"""Skip integration/e2e tests unless explicitly enabled."""
run_integration = config.getoption("--run-integration")
run_e2e = config.getoption("--run-e2e")
skip_integration = pytest.mark.skip(reason="need --run-integration option to run")
skip_e2e = pytest.mark.skip(reason="need --run-e2e option to run")
for item in items:
if "integration" in item.keywords:
if "integration" in item.keywords and not run_integration:
item.add_marker(skip_integration)
if "e2e" in item.keywords and not run_e2e:
item.add_marker(skip_e2e)

View File

@@ -15,7 +15,7 @@ def app_server() -> str:
Uses environment variable E2E_BASE_URL or defaults to production server.
"""
return os.environ.get('E2E_BASE_URL', 'http://127.0.0.1:5000')
return os.environ.get('E2E_BASE_URL', 'http://127.0.0.1:8080')
@pytest.fixture(scope="session")
@@ -33,3 +33,18 @@ def pytest_configure(config):
config.addinivalue_line(
"markers", "e2e: mark test as end-to-end test (requires running server)"
)
config.addinivalue_line(
"markers", "redis: mark test as requiring Redis connection"
)
@pytest.fixture(scope="session")
def api_base_url(app_server):
"""Get the API base URL."""
return f"{app_server}/api"
@pytest.fixture(scope="session")
def health_url(app_server):
"""Get the health check URL."""
return f"{app_server}/health"

281
tests/e2e/test_cache_e2e.py Normal file
View File

@@ -0,0 +1,281 @@
# -*- coding: utf-8 -*-
"""End-to-end tests for Redis cache functionality.
These tests require a running server with Redis enabled.
Run with: pytest tests/e2e/test_cache_e2e.py -v
"""
import pytest
import requests
import time
@pytest.mark.e2e
class TestHealthEndpointE2E:
"""E2E tests for /health endpoint."""
def test_health_endpoint_accessible(self, health_url):
"""Test health endpoint is accessible."""
response = requests.get(health_url, timeout=10)
assert response.status_code in [200, 503]
data = response.json()
assert 'status' in data
assert 'services' in data
assert 'cache' in data
def test_health_shows_database_status(self, health_url):
"""Test health endpoint shows database status."""
response = requests.get(health_url, timeout=10)
data = response.json()
assert 'database' in data['services']
assert data['services']['database'] in ['ok', 'error']
def test_health_shows_redis_status(self, health_url):
"""Test health endpoint shows Redis status."""
response = requests.get(health_url, timeout=10)
data = response.json()
assert 'redis' in data['services']
assert data['services']['redis'] in ['ok', 'error', 'disabled']
def test_health_shows_cache_info(self, health_url):
"""Test health endpoint shows cache information."""
response = requests.get(health_url, timeout=10)
data = response.json()
assert 'cache' in data
assert 'enabled' in data['cache']
assert 'sys_date' in data['cache']
assert 'updated_at' in data['cache']
@pytest.mark.e2e
@pytest.mark.redis
class TestCachedWipApiE2E:
"""E2E tests for cached WIP API endpoints."""
def _unwrap(self, resp_json):
"""Unwrap API response to get data."""
if isinstance(resp_json, dict) and 'data' in resp_json:
return resp_json['data']
return resp_json
def test_wip_summary_returns_data(self, api_base_url):
"""Test WIP summary endpoint returns valid data."""
response = requests.get(f"{api_base_url}/wip/overview/summary", timeout=30)
assert response.status_code == 200
data = self._unwrap(response.json())
assert 'totalLots' in data
assert 'totalQtyPcs' in data
assert 'byWipStatus' in data
assert 'dataUpdateDate' in data
def test_wip_summary_status_breakdown(self, api_base_url):
"""Test WIP summary contains correct status breakdown."""
response = requests.get(f"{api_base_url}/wip/overview/summary", timeout=30)
data = self._unwrap(response.json())
by_status = data['byWipStatus']
assert 'run' in by_status
assert 'queue' in by_status
assert 'hold' in by_status
assert 'qualityHold' in by_status
assert 'nonQualityHold' in by_status
# Each status should have lots and qtyPcs
for status in ['run', 'queue', 'hold']:
assert 'lots' in by_status[status]
assert 'qtyPcs' in by_status[status]
def test_wip_matrix_returns_data(self, api_base_url):
"""Test WIP matrix endpoint returns valid data."""
response = requests.get(f"{api_base_url}/wip/overview/matrix", timeout=30)
assert response.status_code == 200
data = self._unwrap(response.json())
assert 'workcenters' in data
assert 'packages' in data
assert 'matrix' in data
assert 'workcenter_totals' in data
assert 'package_totals' in data
assert 'grand_total' in data
def test_wip_workcenters_returns_list(self, api_base_url):
"""Test workcenters endpoint returns list."""
response = requests.get(f"{api_base_url}/wip/meta/workcenters", timeout=30)
assert response.status_code == 200
data = self._unwrap(response.json())
assert isinstance(data, list)
if len(data) > 0:
assert 'name' in data[0]
assert 'lot_count' in data[0]
def test_wip_packages_returns_list(self, api_base_url):
"""Test packages endpoint returns list."""
response = requests.get(f"{api_base_url}/wip/meta/packages", timeout=30)
assert response.status_code == 200
data = self._unwrap(response.json())
assert isinstance(data, list)
if len(data) > 0:
assert 'name' in data[0]
assert 'lot_count' in data[0]
def test_wip_hold_summary_returns_data(self, api_base_url):
"""Test hold summary endpoint returns valid data."""
response = requests.get(f"{api_base_url}/wip/overview/hold", timeout=30)
assert response.status_code == 200
data = self._unwrap(response.json())
assert 'items' in data
assert isinstance(data['items'], list)
@pytest.mark.e2e
@pytest.mark.redis
class TestCachePerformanceE2E:
"""E2E tests for cache performance."""
def _unwrap(self, resp_json):
"""Unwrap API response to get data."""
if isinstance(resp_json, dict) and 'data' in resp_json:
return resp_json['data']
return resp_json
def test_cached_response_is_fast(self, api_base_url):
"""Test cached responses are faster than 2 seconds."""
# First request may load cache
requests.get(f"{api_base_url}/wip/overview/summary", timeout=30)
# Second request should be from cache
start = time.time()
response = requests.get(f"{api_base_url}/wip/overview/summary", timeout=30)
elapsed = time.time() - start
assert response.status_code == 200
# Cached response should be fast (< 2 seconds)
assert elapsed < 2.0, f"Response took {elapsed:.2f}s, expected < 2s"
def test_multiple_endpoints_consistent(self, api_base_url):
"""Test multiple endpoints return consistent data."""
# Get summary
summary_resp = requests.get(f"{api_base_url}/wip/overview/summary", timeout=30)
summary = self._unwrap(summary_resp.json())
# Get matrix
matrix_resp = requests.get(f"{api_base_url}/wip/overview/matrix", timeout=30)
matrix = self._unwrap(matrix_resp.json())
# Grand total from matrix should match total from summary (approximately)
# There may be slight differences due to filtering
if summary['totalLots'] > 0 and matrix['grand_total'] > 0:
assert summary['totalQtyPcs'] > 0 or matrix['grand_total'] > 0
@pytest.mark.e2e
@pytest.mark.redis
class TestSearchEndpointsE2E:
"""E2E tests for search endpoints with cache."""
def _unwrap(self, resp_json):
"""Unwrap API response to get data."""
if isinstance(resp_json, dict) and 'data' in resp_json:
data = resp_json['data']
# Search returns {'items': [...]}
if isinstance(data, dict) and 'items' in data:
return data['items']
return data
return resp_json
def test_search_workorders(self, api_base_url):
"""Test workorder search returns results."""
# Use a common pattern that should exist
response = requests.get(
f"{api_base_url}/wip/meta/search",
params={'type': 'workorder', 'q': 'WO', 'limit': 10},
timeout=30
)
assert response.status_code == 200
data = self._unwrap(response.json())
assert isinstance(data, list)
def test_search_lotids(self, api_base_url):
"""Test lot ID search returns results."""
response = requests.get(
f"{api_base_url}/wip/meta/search",
params={'type': 'lotid', 'q': 'LOT', 'limit': 10},
timeout=30
)
assert response.status_code == 200
data = self._unwrap(response.json())
assert isinstance(data, list)
def test_search_with_short_query_returns_empty(self, api_base_url):
"""Test search with short query returns empty list."""
response = requests.get(
f"{api_base_url}/wip/meta/search",
params={'type': 'workorder', 'q': 'W'}, # Too short
timeout=30
)
assert response.status_code == 200
data = self._unwrap(response.json())
assert data == []
@pytest.mark.e2e
@pytest.mark.redis
class TestWipDetailE2E:
"""E2E tests for WIP detail endpoint with cache."""
def _unwrap(self, resp_json):
"""Unwrap API response to get data."""
if isinstance(resp_json, dict) and 'data' in resp_json:
return resp_json['data']
return resp_json
def test_wip_detail_with_workcenter(self, api_base_url):
"""Test WIP detail endpoint for a workcenter."""
# First get list of workcenters
wc_resp = requests.get(f"{api_base_url}/wip/meta/workcenters", timeout=30)
workcenters = self._unwrap(wc_resp.json())
if len(workcenters) > 0:
wc_name = workcenters[0]['name']
response = requests.get(
f"{api_base_url}/wip/detail/{wc_name}",
timeout=30
)
assert response.status_code == 200
data = self._unwrap(response.json())
assert 'workcenter' in data
assert 'summary' in data
assert 'lots' in data
assert 'pagination' in data
def test_wip_detail_pagination(self, api_base_url):
"""Test WIP detail pagination."""
wc_resp = requests.get(f"{api_base_url}/wip/meta/workcenters", timeout=30)
workcenters = self._unwrap(wc_resp.json())
if len(workcenters) > 0:
wc_name = workcenters[0]['name']
response = requests.get(
f"{api_base_url}/wip/detail/{wc_name}",
params={'page': 1, 'page_size': 10},
timeout=30
)
assert response.status_code == 200
data = self._unwrap(response.json())
assert data['pagination']['page'] == 1
assert data['pagination']['page_size'] == 10

231
tests/test_cache.py Normal file
View File

@@ -0,0 +1,231 @@
# -*- coding: utf-8 -*-
"""Unit tests for cache module.
Tests cache read/write functionality and fallback mechanism.
"""
import pytest
from unittest.mock import patch, MagicMock
import pandas as pd
import json
class TestGetCachedWipData:
"""Test get_cached_wip_data function."""
@pytest.fixture(autouse=True)
def reset_redis(self):
"""Reset Redis client state."""
import mes_dashboard.core.redis_client as rc
rc._REDIS_CLIENT = None
yield
rc._REDIS_CLIENT = None
def test_returns_none_when_redis_disabled(self):
"""Test returns None when Redis is disabled."""
import mes_dashboard.core.cache as cache
with patch.object(cache, 'REDIS_ENABLED', False):
result = cache.get_cached_wip_data()
assert result is None
def test_returns_none_when_client_unavailable(self):
"""Test returns None when Redis client is unavailable."""
import mes_dashboard.core.cache as cache
with patch.object(cache, 'REDIS_ENABLED', True):
with patch.object(cache, 'get_redis_client', return_value=None):
result = cache.get_cached_wip_data()
assert result is None
def test_returns_none_when_cache_miss(self, reset_redis):
"""Test returns None when cache key doesn't exist."""
import mes_dashboard.core.cache as cache
mock_client = MagicMock()
mock_client.get.return_value = None
with patch.object(cache, 'REDIS_ENABLED', True):
with patch.object(cache, 'get_redis_client', return_value=mock_client):
result = cache.get_cached_wip_data()
assert result is None
def test_returns_dataframe_from_cache(self, reset_redis):
"""Test returns DataFrame when cache hit."""
import mes_dashboard.core.cache as cache
# Create test data as JSON string (what Redis returns with decode_responses=True)
test_data = [
{'LOTID': 'LOT001', 'QTY': 100, 'WORKORDER': 'WO001'},
{'LOTID': 'LOT002', 'QTY': 200, 'WORKORDER': 'WO002'}
]
cached_json = json.dumps(test_data)
mock_client = MagicMock()
mock_client.get.return_value = cached_json # String, not bytes
with patch.object(cache, 'REDIS_ENABLED', True):
with patch.object(cache, 'get_redis_client', return_value=mock_client):
with patch.object(cache, 'get_key', return_value='mes_wip:data'):
result = cache.get_cached_wip_data()
assert result is not None
assert isinstance(result, pd.DataFrame)
assert len(result) == 2
assert 'LOTID' in result.columns
def test_handles_invalid_json(self, reset_redis):
"""Test handles invalid JSON gracefully."""
import mes_dashboard.core.cache as cache
mock_client = MagicMock()
mock_client.get.return_value = 'invalid json {'
with patch.object(cache, 'REDIS_ENABLED', True):
with patch.object(cache, 'get_redis_client', return_value=mock_client):
with patch.object(cache, 'get_key', return_value='mes_wip:data'):
result = cache.get_cached_wip_data()
assert result is None
class TestGetCachedSysDate:
"""Test get_cached_sys_date function."""
def test_returns_none_when_redis_disabled(self):
"""Test returns None when Redis is disabled."""
import mes_dashboard.core.cache as cache
with patch.object(cache, 'REDIS_ENABLED', False):
result = cache.get_cached_sys_date()
assert result is None
def test_returns_sys_date_from_cache(self):
"""Test returns SYS_DATE when cache hit."""
import mes_dashboard.core.cache as cache
mock_client = MagicMock()
mock_client.get.return_value = '2024-01-15 10:30:00' # String, not bytes
with patch.object(cache, 'REDIS_ENABLED', True):
with patch.object(cache, 'get_redis_client', return_value=mock_client):
with patch.object(cache, 'get_key', return_value='mes_wip:meta:sys_date'):
result = cache.get_cached_sys_date()
assert result == '2024-01-15 10:30:00'
class TestGetCacheUpdatedAt:
"""Test get_cache_updated_at function."""
def test_returns_none_when_redis_disabled(self):
"""Test returns None when Redis is disabled."""
import mes_dashboard.core.cache as cache
with patch.object(cache, 'REDIS_ENABLED', False):
result = cache.get_cache_updated_at()
assert result is None
def test_returns_updated_at_from_cache(self):
"""Test returns updated_at timestamp when cache hit."""
import mes_dashboard.core.cache as cache
mock_client = MagicMock()
mock_client.get.return_value = '2024-01-15T10:30:00' # String, not bytes
with patch.object(cache, 'REDIS_ENABLED', True):
with patch.object(cache, 'get_redis_client', return_value=mock_client):
with patch.object(cache, 'get_key', return_value='mes_wip:meta:updated_at'):
result = cache.get_cache_updated_at()
assert result == '2024-01-15T10:30:00'
class TestWipDataWithFallback:
"""Test get_wip_data_with_fallback function."""
def test_uses_cache_when_available(self):
"""Test uses cache when data is available."""
import mes_dashboard.core.cache as cache
cached_df = pd.DataFrame({
'LOTID': ['LOT001'],
'QTY': [100]
})
mock_fallback = MagicMock()
with patch.object(cache, 'get_cached_wip_data', return_value=cached_df):
result = cache.get_wip_data_with_fallback(mock_fallback)
assert result is not None
assert len(result) == 1
# Fallback should NOT be called
mock_fallback.assert_not_called()
def test_fallback_when_cache_unavailable(self):
"""Test falls back when cache is unavailable."""
import mes_dashboard.core.cache as cache
oracle_df = pd.DataFrame({
'LOTID': ['LOT001', 'LOT002'],
'QTY': [100, 200]
})
mock_fallback = MagicMock(return_value=oracle_df)
with patch.object(cache, 'get_cached_wip_data', return_value=None):
result = cache.get_wip_data_with_fallback(mock_fallback)
assert result is not None
assert len(result) == 2
mock_fallback.assert_called_once()
class TestNoOpCache:
"""Test NoOpCache fallback class."""
def test_noop_cache_get(self):
"""Test NoOpCache.get returns None."""
from mes_dashboard.core.cache import NoOpCache
cache = NoOpCache()
result = cache.get('any_key')
assert result is None
def test_noop_cache_set(self):
"""Test NoOpCache.set returns None."""
from mes_dashboard.core.cache import NoOpCache
cache = NoOpCache()
result = cache.set('any_key', 'any_value', 300)
assert result is None
class TestIsCacheAvailable:
"""Test is_cache_available function."""
def test_returns_false_when_disabled(self):
"""Test returns False when Redis is disabled."""
import mes_dashboard.core.cache as cache
with patch.object(cache, 'REDIS_ENABLED', False):
result = cache.is_cache_available()
assert result is False
def test_returns_false_when_no_client(self):
"""Test returns False when no Redis client."""
import mes_dashboard.core.cache as cache
with patch.object(cache, 'REDIS_ENABLED', True):
with patch.object(cache, 'get_redis_client', return_value=None):
result = cache.is_cache_available()
assert result is False
def test_returns_true_when_data_exists(self):
"""Test returns True when data exists in Redis."""
import mes_dashboard.core.cache as cache
mock_client = MagicMock()
mock_client.exists.return_value = 1
with patch.object(cache, 'REDIS_ENABLED', True):
with patch.object(cache, 'get_redis_client', return_value=mock_client):
with patch.object(cache, 'get_key', return_value='mes_wip:data'):
result = cache.is_cache_available()
assert result is True

View File

@@ -0,0 +1,241 @@
# -*- coding: utf-8 -*-
"""Integration tests for cache functionality.
Tests API endpoints with cache enabled/disabled scenarios.
"""
import pytest
from unittest.mock import patch, MagicMock
import pandas as pd
import json
@pytest.fixture
def app_with_mock_cache():
"""Create app with mocked cache."""
import mes_dashboard.core.database as db
db._ENGINE = None
from mes_dashboard.app import create_app
app = create_app('testing')
app.config['TESTING'] = True
return app
class TestHealthEndpoint:
"""Test /health endpoint."""
@patch('mes_dashboard.routes.health_routes.check_database')
@patch('mes_dashboard.routes.health_routes.check_redis')
@patch('mes_dashboard.routes.health_routes.get_cache_status')
def test_health_all_ok(self, mock_cache_status, mock_check_redis, mock_check_db, app_with_mock_cache):
"""Test health endpoint returns 200 when all services are healthy."""
mock_check_db.return_value = ('ok', None)
mock_check_redis.return_value = ('ok', None)
mock_cache_status.return_value = {
'enabled': True,
'sys_date': '2024-01-15 10:30:00',
'updated_at': '2024-01-15T10:30:00'
}
with app_with_mock_cache.test_client() as client:
response = client.get('/health')
assert response.status_code == 200
data = response.get_json()
assert data['status'] == 'healthy'
assert data['services']['database'] == 'ok'
assert data['services']['redis'] == 'ok'
@patch('mes_dashboard.routes.health_routes.check_database')
@patch('mes_dashboard.routes.health_routes.check_redis')
@patch('mes_dashboard.routes.health_routes.get_cache_status')
def test_health_redis_down_degraded(self, mock_cache_status, mock_check_redis, mock_check_db, app_with_mock_cache):
"""Test health endpoint returns 200 degraded when Redis is down."""
mock_check_db.return_value = ('ok', None)
mock_check_redis.return_value = ('error', 'Connection refused')
mock_cache_status.return_value = {'enabled': True, 'sys_date': None, 'updated_at': None}
with app_with_mock_cache.test_client() as client:
response = client.get('/health')
assert response.status_code == 200
data = response.get_json()
assert data['status'] == 'degraded'
assert 'warnings' in data
@patch('mes_dashboard.routes.health_routes.check_database')
@patch('mes_dashboard.routes.health_routes.check_redis')
@patch('mes_dashboard.routes.health_routes.get_cache_status')
def test_health_db_down_unhealthy(self, mock_cache_status, mock_check_redis, mock_check_db, app_with_mock_cache):
"""Test health endpoint returns 503 when database is down."""
mock_check_db.return_value = ('error', 'Connection refused')
mock_check_redis.return_value = ('ok', None)
mock_cache_status.return_value = {'enabled': True, 'sys_date': None, 'updated_at': None}
with app_with_mock_cache.test_client() as client:
response = client.get('/health')
assert response.status_code == 503
data = response.get_json()
assert data['status'] == 'unhealthy'
assert 'errors' in data
@patch('mes_dashboard.routes.health_routes.check_database')
@patch('mes_dashboard.routes.health_routes.check_redis')
@patch('mes_dashboard.routes.health_routes.get_cache_status')
def test_health_redis_disabled(self, mock_cache_status, mock_check_redis, mock_check_db, app_with_mock_cache):
"""Test health endpoint shows Redis disabled status."""
mock_check_db.return_value = ('ok', None)
mock_check_redis.return_value = ('disabled', None)
mock_cache_status.return_value = {'enabled': False, 'sys_date': None, 'updated_at': None}
with app_with_mock_cache.test_client() as client:
response = client.get('/health')
assert response.status_code == 200
data = response.get_json()
assert data['status'] == 'healthy'
assert data['services']['redis'] == 'disabled'
class TestWipApiWithCache:
"""Test WIP API endpoints with cache."""
@pytest.fixture
def mock_wip_cache_data(self):
"""Create mock WIP data for cache."""
return pd.DataFrame({
'LOTID': ['LOT001', 'LOT002', 'LOT003'],
'QTY': [100, 200, 150],
'WORKORDER': ['WO001', 'WO002', 'WO003'],
'WORKCENTER_GROUP': ['WC1', 'WC1', 'WC2'],
'WORKCENTERSEQUENCE_GROUP': [1, 1, 2],
'PRODUCTLINENAME': ['PKG1', 'PKG2', 'PKG1'],
'EQUIPMENTCOUNT': [1, 0, 0],
'CURRENTHOLDCOUNT': [0, 1, 0],
'HOLDREASONNAME': [None, 'Quality Issue', None],
'STATUS': ['ACTIVE', 'HOLD', 'ACTIVE'],
'SPECNAME': ['SPEC1', 'SPEC1', 'SPEC2'],
'SPECSEQUENCE': [1, 1, 2],
'AGEBYDAYS': [1.5, 3.2, 0.5],
'EQUIPMENTS': ['EQ001', None, None],
'SYS_DATE': ['2024-01-15 10:30:00'] * 3
})
@patch('mes_dashboard.services.wip_service._get_wip_dataframe')
@patch('mes_dashboard.services.wip_service.get_cached_sys_date')
def test_wip_summary_uses_cache(self, mock_sys_date, mock_get_df, app_with_mock_cache, mock_wip_cache_data):
"""Test /api/wip/overview/summary uses cache when available."""
mock_get_df.return_value = mock_wip_cache_data
mock_sys_date.return_value = '2024-01-15 10:30:00'
with app_with_mock_cache.test_client() as client:
response = client.get('/api/wip/overview/summary')
assert response.status_code == 200
resp = response.get_json()
# API returns wrapped response: {success: true, data: {...}}
data = resp.get('data', resp) # Handle both wrapped and unwrapped
assert data['totalLots'] == 3
assert data['dataUpdateDate'] == '2024-01-15 10:30:00'
@patch('mes_dashboard.services.wip_service._get_wip_dataframe')
@patch('mes_dashboard.services.wip_service.get_cached_sys_date')
def test_wip_matrix_uses_cache(self, mock_sys_date, mock_get_df, app_with_mock_cache, mock_wip_cache_data):
"""Test /api/wip/overview/matrix uses cache when available."""
mock_get_df.return_value = mock_wip_cache_data
mock_sys_date.return_value = '2024-01-15 10:30:00'
with app_with_mock_cache.test_client() as client:
response = client.get('/api/wip/overview/matrix')
assert response.status_code == 200
resp = response.get_json()
# API returns wrapped response: {success: true, data: {...}}
data = resp.get('data', resp)
assert 'workcenters' in data
assert 'packages' in data
assert 'matrix' in data
@patch('mes_dashboard.services.wip_service._get_wip_dataframe')
def test_workcenters_uses_cache(self, mock_get_df, app_with_mock_cache, mock_wip_cache_data):
"""Test /api/wip/meta/workcenters uses cache when available."""
mock_get_df.return_value = mock_wip_cache_data
with app_with_mock_cache.test_client() as client:
response = client.get('/api/wip/meta/workcenters')
assert response.status_code == 200
resp = response.get_json()
# API returns wrapped response: {success: true, data: [...]}
data = resp.get('data', resp) if isinstance(resp, dict) and 'data' in resp else resp
assert isinstance(data, list)
assert len(data) == 2 # WC1 and WC2
@patch('mes_dashboard.services.wip_service._get_wip_dataframe')
def test_packages_uses_cache(self, mock_get_df, app_with_mock_cache, mock_wip_cache_data):
"""Test /api/wip/meta/packages uses cache when available."""
mock_get_df.return_value = mock_wip_cache_data
with app_with_mock_cache.test_client() as client:
response = client.get('/api/wip/meta/packages')
assert response.status_code == 200
resp = response.get_json()
# API returns wrapped response: {success: true, data: [...]}
data = resp.get('data', resp) if isinstance(resp, dict) and 'data' in resp else resp
assert isinstance(data, list)
assert len(data) == 2 # PKG1 and PKG2
class TestFallbackToOracle:
"""Test fallback to Oracle when cache is unavailable."""
@patch('mes_dashboard.services.wip_service._get_wip_dataframe')
@patch('mes_dashboard.services.wip_service._get_wip_summary_from_oracle')
def test_summary_falls_back_to_oracle(self, mock_oracle, mock_get_df, app_with_mock_cache):
"""Test summary falls back to Oracle when cache unavailable."""
mock_get_df.return_value = None # Cache miss
mock_oracle.return_value = {
'totalLots': 100,
'totalQtyPcs': 10000,
'byWipStatus': {
'run': {'lots': 30, 'qtyPcs': 3000},
'queue': {'lots': 50, 'qtyPcs': 5000},
'hold': {'lots': 20, 'qtyPcs': 2000},
'qualityHold': {'lots': 15, 'qtyPcs': 1500},
'nonQualityHold': {'lots': 5, 'qtyPcs': 500}
},
'dataUpdateDate': '2024-01-15 10:30:00'
}
with app_with_mock_cache.test_client() as client:
response = client.get('/api/wip/overview/summary')
assert response.status_code == 200
resp = response.get_json()
# API returns wrapped response: {success: true, data: {...}}
data = resp.get('data', resp)
assert data['totalLots'] == 100
mock_oracle.assert_called_once()
@patch('mes_dashboard.services.wip_service._get_wip_dataframe')
@patch('mes_dashboard.services.wip_service._get_workcenters_from_oracle')
def test_workcenters_falls_back_to_oracle(self, mock_oracle, mock_get_df, app_with_mock_cache):
"""Test workcenters falls back to Oracle when cache unavailable."""
mock_get_df.return_value = None # Cache miss
mock_oracle.return_value = [
{'name': 'WC1', 'lot_count': 50},
{'name': 'WC2', 'lot_count': 30}
]
with app_with_mock_cache.test_client() as client:
response = client.get('/api/wip/meta/workcenters')
assert response.status_code == 200
resp = response.get_json()
# API returns wrapped response: {success: true, data: [...]}
data = resp.get('data', resp) if isinstance(resp, dict) and 'data' in resp else resp
assert len(data) == 2
mock_oracle.assert_called_once()

222
tests/test_cache_updater.py Normal file
View File

@@ -0,0 +1,222 @@
# -*- coding: utf-8 -*-
"""Unit tests for cache updater module.
Tests background cache update logic.
"""
import pytest
from unittest.mock import patch, MagicMock
import pandas as pd
import time
class TestCacheUpdater:
"""Test CacheUpdater class."""
@pytest.fixture(autouse=True)
def reset_state(self):
"""Reset module state before each test."""
import mes_dashboard.core.redis_client as rc
rc._REDIS_CLIENT = None
yield
rc._REDIS_CLIENT = None
def test_updater_starts_when_redis_enabled(self, reset_state):
"""Test updater starts when Redis is enabled."""
import mes_dashboard.core.cache_updater as cu
mock_client = MagicMock()
mock_client.ping.return_value = True
with patch.object(cu, 'REDIS_ENABLED', True):
with patch.object(cu, 'redis_available', return_value=True):
with patch.object(cu, 'read_sql_df', return_value=None):
updater = cu.CacheUpdater(interval=1)
try:
updater.start()
assert updater._is_running is True
assert updater._thread is not None
finally:
updater.stop()
time.sleep(0.2)
def test_updater_does_not_start_when_redis_disabled(self, reset_state):
"""Test updater does not start when Redis is disabled."""
import mes_dashboard.core.cache_updater as cu
with patch.object(cu, 'REDIS_ENABLED', False):
updater = cu.CacheUpdater(interval=1)
updater.start()
assert updater._is_running is False
def test_updater_stops_gracefully(self, reset_state):
"""Test updater stops gracefully."""
import mes_dashboard.core.cache_updater as cu
mock_client = MagicMock()
mock_client.ping.return_value = True
with patch.object(cu, 'REDIS_ENABLED', True):
with patch.object(cu, 'redis_available', return_value=True):
with patch.object(cu, 'read_sql_df', return_value=None):
updater = cu.CacheUpdater(interval=1)
updater.start()
assert updater._is_running is True
updater.stop()
time.sleep(0.2) # Give thread time to stop
assert updater._is_running is False
class TestCheckSysDate:
"""Test SYS_DATE checking logic."""
def test_check_sys_date_returns_value(self):
"""Test _check_sys_date returns correct value."""
import mes_dashboard.core.cache_updater as cu
mock_df = pd.DataFrame({'SYS_DATE': ['2024-01-15 10:30:00']})
with patch.object(cu, 'read_sql_df', return_value=mock_df):
updater = cu.CacheUpdater()
result = updater._check_sys_date()
assert result == '2024-01-15 10:30:00'
def test_check_sys_date_handles_empty_result(self):
"""Test _check_sys_date handles empty result."""
import mes_dashboard.core.cache_updater as cu
with patch.object(cu, 'read_sql_df', return_value=pd.DataFrame()):
updater = cu.CacheUpdater()
result = updater._check_sys_date()
assert result is None
def test_check_sys_date_handles_none_result(self):
"""Test _check_sys_date handles None result."""
import mes_dashboard.core.cache_updater as cu
with patch.object(cu, 'read_sql_df', return_value=None):
updater = cu.CacheUpdater()
result = updater._check_sys_date()
assert result is None
def test_check_sys_date_handles_exception(self):
"""Test _check_sys_date handles database exception."""
import mes_dashboard.core.cache_updater as cu
with patch.object(cu, 'read_sql_df', side_effect=Exception("Database error")):
updater = cu.CacheUpdater()
result = updater._check_sys_date()
assert result is None
class TestLoadFullTable:
"""Test full table loading logic."""
def test_load_full_table_success(self):
"""Test _load_full_table loads data correctly."""
import mes_dashboard.core.cache_updater as cu
test_df = pd.DataFrame({
'LOTID': ['LOT001', 'LOT002'],
'QTY': [100, 200],
'WORKORDER': ['WO001', 'WO002']
})
with patch.object(cu, 'read_sql_df', return_value=test_df):
updater = cu.CacheUpdater()
result = updater._load_full_table()
assert result is not None
assert len(result) == 2
def test_load_full_table_handles_none(self):
"""Test _load_full_table handles None result."""
import mes_dashboard.core.cache_updater as cu
with patch.object(cu, 'read_sql_df', return_value=None):
updater = cu.CacheUpdater()
result = updater._load_full_table()
assert result is None
def test_load_full_table_handles_exception(self):
"""Test _load_full_table handles exception."""
import mes_dashboard.core.cache_updater as cu
with patch.object(cu, 'read_sql_df', side_effect=Exception("Database error")):
updater = cu.CacheUpdater()
result = updater._load_full_table()
assert result is None
class TestUpdateRedisCache:
"""Test Redis cache update logic."""
def test_update_redis_cache_success(self):
"""Test _update_redis_cache updates cache correctly."""
import mes_dashboard.core.cache_updater as cu
mock_client = MagicMock()
mock_pipeline = MagicMock()
mock_client.pipeline.return_value = mock_pipeline
test_df = pd.DataFrame({
'LOTID': ['LOT001'],
'QTY': [100]
})
with patch.object(cu, 'get_redis_client', return_value=mock_client):
with patch.object(cu, 'get_key', side_effect=lambda k: f'mes_wip:{k}'):
updater = cu.CacheUpdater()
result = updater._update_redis_cache(test_df, '2024-01-15 10:30:00')
assert result is True
mock_pipeline.execute.assert_called_once()
def test_update_redis_cache_no_client(self):
"""Test _update_redis_cache handles no client."""
import mes_dashboard.core.cache_updater as cu
test_df = pd.DataFrame({'LOTID': ['LOT001']})
with patch.object(cu, 'get_redis_client', return_value=None):
updater = cu.CacheUpdater()
result = updater._update_redis_cache(test_df, '2024-01-15')
assert result is False
class TestCacheUpdateFlow:
"""Test complete cache update flow."""
def test_no_update_when_sys_date_unchanged(self):
"""Test cache doesn't update when SYS_DATE unchanged."""
import mes_dashboard.core.cache_updater as cu
mock_df = pd.DataFrame({'SYS_DATE': ['2024-01-15 10:30:00']})
mock_client = MagicMock()
mock_client.get.return_value = '2024-01-15 10:30:00'
with patch.object(cu, 'read_sql_df', return_value=mock_df):
with patch.object(cu, 'redis_available', return_value=True):
with patch.object(cu, 'get_redis_client', return_value=mock_client):
with patch.object(cu, 'get_key', side_effect=lambda k: f'mes_wip:{k}'):
updater = cu.CacheUpdater()
# Simulate already having cached the same date
result = updater._check_and_update(force=False)
# No update because dates match
assert result is False
def test_update_when_sys_date_changes(self):
"""Test cache updates when SYS_DATE changes."""
import mes_dashboard.core.cache_updater as cu
updater = cu.CacheUpdater()
mock_df = pd.DataFrame({'SYS_DATE': ['2024-01-15 11:00:00']})
with patch.object(cu, 'read_sql_df', return_value=mock_df):
current_date = updater._check_sys_date()
old_date = '2024-01-15 10:30:00'
needs_update = current_date != old_date
assert needs_update is True

162
tests/test_redis_client.py Normal file
View File

@@ -0,0 +1,162 @@
# -*- coding: utf-8 -*-
"""Unit tests for Redis client module.
Tests Redis connection management with mocked Redis.
"""
import pytest
from unittest.mock import patch, MagicMock
import importlib
class TestRedisClient:
"""Test Redis client connection management."""
@pytest.fixture(autouse=True)
def reset_module(self):
"""Reset module state before each test."""
import mes_dashboard.core.redis_client as rc
rc._REDIS_CLIENT = None
yield
rc._REDIS_CLIENT = None
def test_get_redis_client_success(self, reset_module):
"""Test successful Redis client creation."""
import mes_dashboard.core.redis_client as rc
with patch.object(rc, 'REDIS_ENABLED', True):
with patch.object(rc.redis.Redis, 'from_url') as mock_from_url:
mock_client = MagicMock()
mock_client.ping.return_value = True
mock_from_url.return_value = mock_client
client = rc.get_redis_client()
assert client is mock_client
mock_from_url.assert_called_once()
def test_get_redis_client_disabled(self, reset_module):
"""Test Redis client returns None when disabled."""
import mes_dashboard.core.redis_client as rc
with patch.object(rc, 'REDIS_ENABLED', False):
client = rc.get_redis_client()
assert client is None
def test_get_redis_client_connection_error(self, reset_module):
"""Test Redis client handles connection errors gracefully."""
import mes_dashboard.core.redis_client as rc
import redis as redis_lib
with patch.object(rc, 'REDIS_ENABLED', True):
with patch.object(rc.redis.Redis, 'from_url') as mock_from_url:
mock_from_url.side_effect = redis_lib.RedisError("Connection refused")
client = rc.get_redis_client()
assert client is None
def test_redis_available_true(self, reset_module):
"""Test redis_available returns True when Redis is connected."""
import mes_dashboard.core.redis_client as rc
with patch.object(rc, 'REDIS_ENABLED', True):
with patch.object(rc.redis.Redis, 'from_url') as mock_from_url:
mock_client = MagicMock()
mock_client.ping.return_value = True
mock_from_url.return_value = mock_client
assert rc.redis_available() is True
def test_redis_available_disabled(self, reset_module):
"""Test redis_available returns False when disabled."""
import mes_dashboard.core.redis_client as rc
with patch.object(rc, 'REDIS_ENABLED', False):
assert rc.redis_available() is False
def test_get_key_with_prefix(self):
"""Test get_key adds prefix correctly."""
import mes_dashboard.core.redis_client as rc
with patch.object(rc, 'REDIS_KEY_PREFIX', 'test_prefix'):
key = rc.get_key('mykey')
assert key == 'test_prefix:mykey'
def test_get_key_without_prefix(self):
"""Test get_key works with empty prefix."""
import mes_dashboard.core.redis_client as rc
with patch.object(rc, 'REDIS_KEY_PREFIX', ''):
key = rc.get_key('mykey')
assert key == ':mykey'
class TestRedisClientSingleton:
"""Test Redis client singleton behavior."""
@pytest.fixture(autouse=True)
def reset_module(self):
"""Reset module state before each test."""
import mes_dashboard.core.redis_client as rc
rc._REDIS_CLIENT = None
yield
rc._REDIS_CLIENT = None
def test_client_is_singleton(self, reset_module):
"""Test that get_redis_client returns same instance."""
import mes_dashboard.core.redis_client as rc
with patch.object(rc, 'REDIS_ENABLED', True):
with patch.object(rc.redis.Redis, 'from_url') as mock_from_url:
mock_client = MagicMock()
mock_client.ping.return_value = True
mock_from_url.return_value = mock_client
client1 = rc.get_redis_client()
client2 = rc.get_redis_client()
assert client1 is client2
# from_url should only be called once
assert mock_from_url.call_count == 1
class TestCloseRedis:
"""Test Redis client cleanup."""
@pytest.fixture(autouse=True)
def reset_module(self):
"""Reset module state before each test."""
import mes_dashboard.core.redis_client as rc
rc._REDIS_CLIENT = None
yield
rc._REDIS_CLIENT = None
def test_close_redis(self, reset_module):
"""Test close_redis properly closes connection."""
import mes_dashboard.core.redis_client as rc
with patch.object(rc, 'REDIS_ENABLED', True):
with patch.object(rc.redis.Redis, 'from_url') as mock_from_url:
mock_client = MagicMock()
mock_client.ping.return_value = True
mock_from_url.return_value = mock_client
# Get client first
client = rc.get_redis_client()
assert client is not None
# Close it
rc.close_redis()
# Verify close was called
mock_client.close.assert_called_once()
assert rc._REDIS_CLIENT is None
def test_close_redis_when_none(self, reset_module):
"""Test close_redis does nothing when no client."""
import mes_dashboard.core.redis_client as rc
# Should not raise any errors
rc.close_redis()
assert rc._REDIS_CLIENT is None