feat: 新增機台稼動歷史分析功能

新增 Resource History Analysis 功能模組:
- 後端服務:query_summary、query_detail、export_csv 查詢函數
- API 路由:/api/resource/history/* 端點 (options/summary/detail/export)
- 前端頁面:KPI 卡片、OU% 趨勢圖、E10 狀態堆疊圖、熱力圖、階層式表格
- 支援時間粒度切換(日/週/月/年)與多維度篩選
- 查詢範圍擴展至 730 天(兩年)

其他改進:
- 新增 filter_cache 服務統一管理工站與型號快取
- MesApi 修復 JSON 解析失敗時誤觸重試的問題
- 新增 _safe_float() 處理 NaN 值避免 JSON 序列化錯誤
- E10 狀態分布圖表 tooltip 顯示各狀態百分比
- 新增完整測試套件(單元/整合/E2E)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
beabigegg
2026-01-29 08:53:32 +08:00
parent e21d736b3e
commit d5f0df3384
19 changed files with 4900 additions and 1448 deletions

58
claude.md Normal file
View File

@@ -0,0 +1,58 @@
# MES Dashboard - Claude Code Instructions
## Project Overview
MES Dashboard 是一個工廠製造執行系統的儀表板應用,使用 Flask + Oracle 資料庫 + ECharts 前端圖表。
## Before Starting Any Task
1. **Review architecture documentation**: Read `docs/architecture_findings.md` to understand:
- Database connection management patterns
- Caching mechanisms and TTL constants
- Filter cache (workcenter/family) usage
- Frontend global components (Toast, MesApi)
- Data table filtering rules and column mappings
- E10 status definitions and OU% calculation
- Testing conventions
## When Making Changes
If any of the following patterns are modified or new patterns are discovered:
- Database connection or pooling approach
- Caching strategy or TTL values
- Global frontend components usage
- Data table column names or filtering rules
- New shared utilities or services
- Testing conventions or setup patterns
**Update `docs/architecture_findings.md`** to reflect the changes.
## Key Architecture Rules
### Database
- Always use `mes_dashboard.core.database.read_sql_df()` for queries
- Never create direct connections in services
- Reset `db._ENGINE = None` in test setUp
### Caching
- Use `mes_dashboard.core.cache` for all caching operations
- Use `mes_dashboard.services.filter_cache` for workcenter/family lookups
- Always convert WORKCENTERNAME → WORKCENTER_GROUP for display
### Frontend
- Toast notifications: Use `Toast.warning()`, `Toast.error()`, `Toast.success()` (NOT MESToast)
- API calls: Use `MesApi.get()` with proper timeout
- Array operations: Remember `.reverse()` modifies in-place
### Data Tables
- DW_MES_RESOURCE: Use `PJ_ASSETSSTATUS` (not ASSETSTATUS), `LOCATIONNAME` (not LOCATION)
- DW_MES_RESOURCESTATUS_SHIFT: HISTORYID maps to RESOURCEID
- DW_PJ_LOT_V: Source for WORKCENTER_GROUP mapping
### SQL
- Use `/*+ MATERIALIZE */` hint for Oracle CTEs used multiple times
- Date range: `TXNDATE >= start AND TXNDATE < end + 1`
- Apply EQUIPMENT_TYPE_FILTER, location exclusions, asset status exclusions
## Testing
- Unit tests: `tests/test_*_service.py`
- Integration tests: `tests/test_*_routes.py`
- E2E tests: `tests/e2e/test_*_e2e.py`
- For parallel queries (ThreadPoolExecutor), mock with function-based side_effect, not list

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,310 @@
# MES Dashboard - Architecture Findings
本文件記錄專案開發過程中確立的架構設計、全局規範與資料處理規則。
---
## 1. 資料庫連線管理
### 連線池統一使用
所有資料庫操作必須透過 `mes_dashboard.core.database` 模組:
```python
from mes_dashboard.core.database import read_sql_df, get_engine
# 讀取資料
df = read_sql_df(sql)
# 取得 engine若需要直接操作
engine = get_engine()
```
### 注意事項
- **禁止**在各 service 中自行建立連線
- 連線池由 `database.py` 統一管理,避免連線洩漏
- 測試環境需在 setUp 中重置:`db._ENGINE = None`
---
## 2. 快取機制
### 全局快取 API
使用 `mes_dashboard.core.cache` 模組:
```python
from mes_dashboard.core.cache import cache_get, cache_set, make_cache_key
# 建立快取 key支援 filters dict
cache_key = make_cache_key("resource_history_summary", filters={
'start_date': start_date,
'workcenter_groups': sorted(groups) if groups else None,
})
# 讀取/寫入快取
result = cache_get(cache_key)
if result is None:
result = query_data()
cache_set(cache_key, result, ttl=CACHE_TTL_TREND)
```
### 快取 TTL 常數
定義於 `mes_dashboard.config.constants`
- `CACHE_TTL_FILTER_OPTIONS`: 篩選選項(較長)
- `CACHE_TTL_TREND`: 趨勢資料(中等)
- `CACHE_TTL_REALTIME`: 即時資料(較短)
---
## 3. Filter Cache篩選選項快取
### 位置
`mes_dashboard.services.filter_cache`
### 用途
快取全站共用的篩選選項,避免重複查詢資料庫:
```python
from mes_dashboard.services.filter_cache import (
get_workcenter_groups, # 取得 workcenter group 列表
get_workcenter_mapping, # 取得 workcentername → group 對應
get_workcenters_for_groups, # 根據 group 取得 workcentername 列表
get_resource_families, # 取得 resource family 列表
)
```
### Workcenter 對應關係
```
WORKCENTERNAME (資料庫) → WORKCENTER_GROUP (顯示)
焊接_DB_1 → 焊接_DB
焊接_DB_2 → 焊接_DB
成型_1 → 成型
```
### 資料來源
- Workcenter Groups: `DW_PJ_LOT_V` (WORKCENTER_GROUP, WORKCENTERSEQUENCE_GROUP)
- Resource Families: `DW_MES_RESOURCE` (RESOURCEFAMILYNAME)
---
## 4. 前端全局組件
### Toast 通知
定義於 `static/js/toast.js`,透過 `_base.html` 載入:
```javascript
// 正確用法
Toast.info('訊息');
Toast.success('成功');
Toast.warning('警告');
Toast.error('錯誤');
Toast.loading('載入中...');
// 錯誤用法(不存在)
MESToast.warning('...'); // ❌ 錯誤
```
### MesApiHTTP 請求)
定義於 `static/js/api.js`,提供統一的 API 呼叫介面:
```javascript
const result = await MesApi.get('/api/endpoint', { timeout: 30000 });
if (result.success) {
// 處理資料
} else {
Toast.error(result.error);
}
```
---
## 5. 資料表預篩選規則
### 設備類型篩選
定義於 `mes_dashboard.config.constants.EQUIPMENT_TYPE_FILTER`
```sql
-- 只查詢特定設備類型
r.EQUIPMENTTYPE IN ('主要設備', '輔助設備')
```
### 排除條件
```python
# 排除的地點
EXCLUDED_LOCATIONS = ['TEST', 'LAB', ...]
# 排除的資產狀態
EXCLUDED_ASSET_STATUSES = ['報廢', '停用', ...]
```
### SQL 範例
```python
# 建立篩選條件
location_filter = _build_location_filter('r')
# → AND (r.LOCATIONNAME IS NULL OR r.LOCATIONNAME NOT IN ('TEST', 'LAB'))
asset_status_filter = _build_asset_status_filter('r')
# → AND r.PJ_ASSETSSTATUS NOT IN ('報廢', '停用')
```
---
## 6. 資料庫欄位對應
### DW_MES_RESOURCE
| 常見錯誤 | 正確欄位名 |
|---------|-----------|
| ASSETSTATUS | PJ_ASSETSSTATUS雙 S|
| LOCATION | LOCATIONNAME |
| ISPRODUCTION | PJ_ISPRODUCTION |
| ISKEY | PJ_ISKEY |
| ISMONITOR | PJ_ISMONITOR |
### DW_MES_RESOURCESTATUS_SHIFT
| 欄位 | 說明 |
|-----|------|
| HISTORYID | 對應 DW_MES_RESOURCE.RESOURCEID |
| TXNDATE | 交易日期 |
| OLDSTATUSNAME | E10 狀態 (PRD, SBY, UDT, SDT, EGT, NST) |
| HOURS | 該狀態時數 |
### DW_PJ_LOT_V
| 欄位 | 說明 |
|-----|------|
| WORKCENTERNAME | 站點名稱(細分)|
| WORKCENTER_GROUP | 站點群組(顯示用)|
| WORKCENTERSEQUENCE_GROUP | 群組排序 |
---
## 7. E10 狀態定義
| 狀態 | 說明 | 計入 OU% |
|-----|------|---------|
| PRD | Production生產| 是(分子)|
| SBY | Standby待機| 是(分母)|
| UDT | Unscheduled Downtime非計畫停機| 是(分母)|
| SDT | Scheduled Downtime計畫停機| 是(分母)|
| EGT | Engineering Time工程時間| 是(分母)|
| NST | Non-Scheduled Time非排程時間| 否 |
### OU% 計算公式
```
OU% = PRD / (PRD + SBY + UDT + SDT + EGT) × 100
```
---
## 8. 平行查詢
### ThreadPoolExecutor
對於多個獨立查詢,使用平行執行提升效能:
```python
from concurrent.futures import ThreadPoolExecutor, as_completed
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {
executor.submit(read_sql_df, kpi_sql): 'kpi',
executor.submit(read_sql_df, trend_sql): 'trend',
executor.submit(read_sql_df, heatmap_sql): 'heatmap',
executor.submit(read_sql_df, comparison_sql): 'comparison',
}
for future in as_completed(futures):
query_name = futures[future]
results[query_name] = future.result()
```
### 注意事項
- Mock 測試時不能使用 `side_effect` 列表(順序不可預測)
- 應使用函式判斷 SQL 內容來回傳對應的 mock 資料
---
## 9. Oracle SQL 優化
### CTE MATERIALIZE Hint
防止 Oracle 優化器將 CTE inline 多次執行:
```sql
WITH shift_data AS (
SELECT /*+ MATERIALIZE */ HISTORYID, TXNDATE, OLDSTATUSNAME, HOURS
FROM DW_MES_RESOURCESTATUS_SHIFT
WHERE TXNDATE >= TO_DATE('2024-01-01', 'YYYY-MM-DD')
AND TXNDATE < TO_DATE('2024-01-07', 'YYYY-MM-DD') + 1
)
SELECT ...
```
### 日期範圍查詢
```sql
-- 包含 end_date 當天
WHERE TXNDATE >= TO_DATE('{start_date}', 'YYYY-MM-DD')
AND TXNDATE < TO_DATE('{end_date}', 'YYYY-MM-DD') + 1
```
---
## 10. 前端資料限制
### 明細資料上限
為避免瀏覽器記憶體問題,明細查詢有筆數限制:
```python
MAX_DETAIL_RECORDS = 5000
if total > MAX_DETAIL_RECORDS:
df = df.head(MAX_DETAIL_RECORDS)
truncated = True
```
前端顯示警告:
```javascript
if (result.truncated) {
Toast.warning(`資料超過 ${result.max_records} 筆,請使用篩選條件縮小範圍。`);
}
```
---
## 11. JavaScript 注意事項
### Array.reverse() 原地修改
```javascript
// 錯誤 - 原地修改陣列
const arr = [1, 2, 3];
arr.reverse(); // arr 被修改為 [3, 2, 1]
// 正確 - 建立新陣列
const reversed = arr.slice().reverse(); // arr 不變
// 或
const reversed = [...arr].reverse();
```
---
## 12. 測試規範
### 測試檔案結構
```
tests/
├── test_*_service.py # 單元測試service layer
├── test_*_routes.py # 整合測試API endpoints
└── e2e/
└── test_*_e2e.py # 端對端測試(完整流程)
```
### 測試前重置
```python
def setUp(self):
db._ENGINE = None # 重置連線池
self.app = create_app('testing')
```
### 執行測試
```bash
# 單一模組
pytest tests/test_resource_history_service.py -v
# 全部相關測試
pytest tests/test_resource_history_*.py tests/e2e/test_resource_history_e2e.py -v
```

View File

@@ -0,0 +1,177 @@
## Context
目前系統的 /resource 頁面混合即時與歷史資料展示,歷史分析功能受限於現有架構。資料來源為:
- **DW_MES_RESOURCESTATUS_SHIFT**:班次級狀態彙總表(約 74M 筆),包含 HOURS 欄位可直接計算各狀態工時
- **DW_MES_RESOURCE**:機台維度資料(約 90K 筆),包含 WORKCENTERNAME、RESOURCEFAMILYNAME、RESOURCENAME 等維度
現有 `dashboard_service.py` 已有 `query_ou_trend()``query_utilization_heatmap()` 函數可參考,使用相同的 OU% 計算公式:`PRD / (PRD + SBY + EGT + SDT + UDT) * 100`
## Goals / Non-Goals
**Goals:**
- 建立獨立的歷史分析頁面,支援多維度、多時間粒度的機台效能分析
- 實現三層階層式下鑽:站點 → 型號 → 個別機台
- 提供完整的 SEMI E10 設備狀態分析(時數 + 佔比)
- 支援日/週/月/年的時間粒度切換
- 採用查詢觸發模式,避免頁面載入時的效能衝擊
**Non-Goals:**
- 不修改現有 /resource 頁面的即時機況功能(僅移除歷史圖表)
- 不實作即時資料推送或自動刷新
- 不整合其他資料來源(如 SECS/GEM 設備訊號)
- 不實作機台詳細事件時間軸(甘特圖)—— 可作為後續擴充
## Decisions
### 1. 頁面架構:完全獨立頁面
**決定**:建立 `/resource-history` 作為完全獨立的頁面
**替代方案**
- (A) 在 /resource 頁面新增 Tab 切換 → 會增加頁面複雜度,且兩者篩選條件不同
- (B) 子路由 `/resource/history` → 與現有 /resource 頁面產生關聯,不符合獨立需求
**理由**:完全獨立的頁面便於維護,不影響現有 /resource 即時機況功能。
### 2. 資料服務:獨立 Service 模組
**決定**:建立 `resource_history_service.py` 獨立模組
**替代方案**
- (A) 擴展現有 `resource_service.py` → 會使檔案過大且職責混淆
- (B) 擴展 `dashboard_service.py` → 該模組已有多個功能,不適合再擴展
**理由**:單一職責原則,便於維護與測試。可重用 `dashboard_service.py` 中的 OU 計算邏輯。
### 3. 時間粒度處理SQL 層聚合
**決定**:在 SQL 查詢中使用 `TRUNC()` 進行時間聚合
```sql
-- 日TRUNC(TXNDATE)
-- 週TRUNC(TXNDATE, 'IW') -- ISO week
-- 月TRUNC(TXNDATE, 'MM')
-- 年TRUNC(TXNDATE, 'YYYY')
```
**替代方案**
- (A) Python 層聚合 → 需拉取更多資料,效能差
- (B) 預計算彙總表 → 需額外 ETL 流程,增加維護成本
**理由**:利用 Oracle 原生函數在資料庫層高效聚合,減少網路傳輸。
### 4. 階層式資料結構:單次查詢 + 前端組裝
**決定**:後端回傳扁平化資料,包含 WORKCENTERNAME、RESOURCEFAMILYNAME、RESOURCENAME 三個維度欄位,前端根據需要進行階層組裝
**替代方案**
- (A) 後端回傳巢狀 JSON → 結構複雜,不利於匯出
- (B) 三次獨立查詢(各層級) → 網路請求多,延遲增加
**理由**:單次查詢減少延遲,扁平結構便於表格渲染與匯出,前端可靈活控制展開/收合邏輯。
### 5. 圖表實作ECharts
**決定**:沿用現有 ECharts 套件
**理由**:與現有頁面一致,減少學習成本和套件依賴。已有 OU 趨勢圖和熱力圖的實作可參考。
### 6. 匯出功能CSV 格式
**決定**:提供 CSV 匯出,由後端生成
**替代方案**
- (A) Excel 格式 → 需額外套件openpyxl增加依賴
- (B) 前端匯出 → 資料量大時效能問題
**理由**CSV 輕量且通用,後端處理可支援大量資料。
## Risks / Trade-offs
| 風險 | 影響 | 緩解措施 |
|------|------|----------|
| 大量資料查詢效能 | 74M 筆資料表的聚合查詢可能緩慢 | 強制要求日期範圍限制(最多 1 年);使用 TXNDATE 索引;考慮查詢超時設定 |
| 前端渲染效能 | 大量機台明細可能導致表格卡頓 | 實作分頁或虛擬捲動;限制單次回傳筆數(如 1000 筆) |
| 記憶體使用 | pandas DataFrame 處理大量資料 | 使用 chunked 讀取或直接串流輸出 |
| 使用者誤操作 | 選擇過長時間範圍導致查詢卡住 | 前端驗證時間範圍;顯示預估資料量警告 |
## API 設計
### 主要 API 端點
```
GET /api/resource/history/summary
?start_date=2024-01-01
&end_date=2024-01-31
&granularity=day|week|month|year
&workcenter=WC01 (optional)
&family=FAM01 (optional)
&is_production=1 (optional)
&is_key=1 (optional)
Response: {
kpi: { ou_pct, prd_hours, sby_hours, udt_hours, sdt_hours, egt_hours, nst_hours, machine_count },
trend: [{ date, ou_pct, prd_hours, ... }],
heatmap: [{ workcenter, date, ou_pct }],
workcenter_comparison: [{ workcenter, ou_pct, prd_hours, ... }]
}
GET /api/resource/history/detail
?start_date=2024-01-01
&end_date=2024-01-31
&granularity=day|week|month|year
&workcenter=WC01 (optional)
&family=FAM01 (optional)
&page=1
&page_size=100
Response: {
data: [{ workcenter, family, resource, ou_pct, prd_hours, prd_pct, sby_hours, sby_pct, ... }],
total: 1234,
page: 1,
page_size: 100
}
GET /api/resource/history/export
?start_date=2024-01-01
&end_date=2024-01-31
&granularity=day
&format=csv
Response: CSV file download
```
## 前端元件結構
```
resource_history.html
├── Filter Bar
│ ├── Date Range Picker (start_date, end_date)
│ ├── Granularity Buttons (日/週/月/年)
│ ├── Workcenter Select (多選)
│ ├── Family Select (多選)
│ ├── Checkbox Filters (生產機/關鍵機/監控機)
│ └── Query Button
├── KPI Cards Row
│ ├── OU% Card
│ ├── PRD Hours Card
│ ├── UDT Hours Card
│ ├── SDT Hours Card
│ ├── EGT Hours Card
│ └── Machine Count Card
├── Charts Row 1
│ ├── OU% Trend Line Chart
│ └── E10 Stacked Bar Chart
├── Charts Row 2
│ ├── Workcenter Comparison Bar Chart
│ └── Utilization Heatmap
└── Detail Table
├── Toolbar (Export, Expand All)
└── Hierarchical Table
├── Workcenter Level (expandable)
│ ├── Family Level (expandable)
│ │ └── Resource Level
```

View File

@@ -0,0 +1,65 @@
## Why
目前缺乏機台歷史效能的深度分析工具。使用者需要:
- 分析各站點、各型號的歷史稼動率趨勢
- 了解 SEMI E10 設備狀態PRD、SBY、UDT、SDT、EGT、NST的時間分布
- 支援日/週/月/年等多時間粒度的效能分析
- 從站點下鑽至型號、再到個別機台的詳細數據
## What Changes
### 新增機台歷史表現分析頁面
建立完全獨立的歷史分析頁面 `/resource-history`,提供:
1. **篩選條件區**
- 日期範圍選擇器
- 時間粒度切換(日/週/月/年)
- 站點Workcenter下拉選單
- 機台型號Resource Family下拉選單
- 查詢按鈕(預設不載入資料,需使用者主動觸發)
2. **KPI 摘要卡片**
- OU%(整體稼動率)
- PRD 時數(生產時間)
- UDT/SDT/EGT 時數(各類停機時間)
- 機台數量
3. **分析圖表**
- OU% 趨勢折線圖(含時間軸)
- E10 狀態堆疊長條圖(各狀態時數分布)
- 工站 OU% 對比水平條形圖
- 設備狀態熱力圖(站點 × 時間)
4. **明細表格**
- 階層式展開:站點 → 型號 → 個別機台
- 欄位OU%、PRD時數/佔比、SBY時數/佔比、UDT時數/佔比、SDT時數/佔比、EGT時數/佔比、NST時數/佔比)、機台數
- 支援匯出功能
## Capabilities
### New Capabilities
- `resource-history-page`: 機台歷史表現分析頁面包含篩選、KPI、圖表、明細表格
- `resource-history-service`: 歷史資料查詢服務,支援多維度聚合與階層式資料結構
### Modified Capabilities
- (無需修改現有 spec/resource 頁面的調整為實作層級變更)
## Impact
- **新增檔案**:
- `src/mes_dashboard/templates/resource_history.html` - 歷史分析頁面模板
- `src/mes_dashboard/routes/resource_history_routes.py` - 歷史分析路由
- `src/mes_dashboard/services/resource_history_service.py` - 歷史資料查詢服務
- **修改檔案**:
- `src/mes_dashboard/__init__.py` - 註冊新路由
- **資料來源**:
- `DW_MES_RESOURCESTATUS_SHIFT` - 機台狀態班別資料(約 74M 筆)
- `DW_MES_RESOURCE` - 機台維度資料
- **向後相容**:本變更為純新增功能,不影響任何現有頁面

View File

@@ -0,0 +1,253 @@
## ADDED Requirements
### Requirement: 頁面路由與存取
系統 SHALL 提供 `/resource-history` 路由存取機台歷史表現分析頁面。
#### Scenario: 存取歷史分析頁面
- **WHEN** 使用者導航至 `/resource-history`
- **THEN** 系統顯示機台歷史表現分析頁面
#### Scenario: 頁面初始狀態
- **WHEN** 頁面首次載入
- **THEN** 系統顯示篩選條件區但不載入任何資料
- **THEN** 圖表和表格區域顯示「請設定查詢條件」提示
---
### Requirement: 日期範圍篩選
系統 SHALL 提供日期範圍選擇器,允許使用者指定查詢的起迄日期。
#### Scenario: 選擇日期範圍
- **WHEN** 使用者設定開始日期為 2024-01-01結束日期為 2024-01-31
- **THEN** 系統記錄查詢範圍為該期間
#### Scenario: 日期範圍限制
- **WHEN** 使用者選擇超過 365 天的日期範圍
- **THEN** 系統顯示警告訊息「查詢範圍不可超過一年」
- **THEN** 系統阻止查詢執行
#### Scenario: 預設日期範圍
- **WHEN** 頁面載入時
- **THEN** 日期範圍預設為最近 7 天(不含今日)
---
### Requirement: 時間粒度切換
系統 SHALL 提供日/週/月/年四種時間粒度選項,用於控制資料聚合方式。
#### Scenario: 切換至日粒度
- **WHEN** 使用者選擇「日」粒度
- **THEN** 後續查詢以每日為單位聚合資料
#### Scenario: 切換至週粒度
- **WHEN** 使用者選擇「週」粒度
- **THEN** 後續查詢以 ISO 週為單位聚合資料
#### Scenario: 切換至月粒度
- **WHEN** 使用者選擇「月」粒度
- **THEN** 後續查詢以每月為單位聚合資料
#### Scenario: 切換至年粒度
- **WHEN** 使用者選擇「年」粒度
- **THEN** 後續查詢以每年為單位聚合資料
#### Scenario: 預設粒度
- **WHEN** 頁面載入時
- **THEN** 時間粒度預設為「日」
---
### Requirement: 站點與型號篩選
系統 SHALL 提供站點Workcenter和機台型號Resource Family下拉選單進行資料篩選。
#### Scenario: 篩選特定站點
- **WHEN** 使用者從站點下拉選單選擇「WC01」
- **THEN** 查詢結果僅包含該站點的資料
#### Scenario: 篩選特定型號
- **WHEN** 使用者從型號下拉選單選擇「FAM01」
- **THEN** 查詢結果僅包含該型號的資料
#### Scenario: 組合篩選
- **WHEN** 使用者同時選擇站點「WC01」和型號「FAM01」
- **THEN** 查詢結果僅包含同時符合兩個條件的資料
#### Scenario: 動態載入篩選選項
- **WHEN** 頁面載入時
- **THEN** 系統從資料庫載入可用的站點和型號列表
---
### Requirement: 設備旗標篩選
系統 SHALL 提供生產機、關鍵機、監控機三個 checkbox 篩選選項。
#### Scenario: 篩選生產機
- **WHEN** 使用者勾選「生產機」checkbox
- **THEN** 查詢結果僅包含 PJ_ISPRODUCTION = 1 的機台
#### Scenario: 篩選關鍵機
- **WHEN** 使用者勾選「關鍵機」checkbox
- **THEN** 查詢結果僅包含 PJ_ISKEY = 1 的機台
#### Scenario: 篩選監控機
- **WHEN** 使用者勾選「監控機」checkbox
- **THEN** 查詢結果僅包含 PJ_ISMONITOR = 1 的機台
---
### Requirement: 查詢觸發
系統 SHALL 提供查詢按鈕,使用者點擊後才執行資料查詢。
#### Scenario: 執行查詢
- **WHEN** 使用者設定完篩選條件後點擊「查詢」按鈕
- **THEN** 系統根據篩選條件執行查詢
- **THEN** 系統顯示載入指示器
- **THEN** 查詢完成後更新 KPI、圖表、表格
#### Scenario: 查詢失敗處理
- **WHEN** 查詢執行失敗(如網路錯誤、超時)
- **THEN** 系統顯示錯誤訊息 toast 通知
- **THEN** 系統隱藏載入指示器
---
### Requirement: KPI 摘要卡片
系統 SHALL 顯示 6 個 KPI 摘要卡片OU%、PRD 時數、UDT 時數、SDT 時數、EGT 時數、機台數。
#### Scenario: 顯示 OU%
- **WHEN** 查詢完成
- **THEN** 系統顯示查詢範圍內的整體 OU%
- **THEN** OU% 計算公式為 PRD / (PRD + SBY + EGT + SDT + UDT) * 100
#### Scenario: 顯示各狀態時數
- **WHEN** 查詢完成
- **THEN** 系統顯示 PRD、UDT、SDT、EGT 的總時數(小時)
#### Scenario: 顯示機台數
- **WHEN** 查詢完成
- **THEN** 系統顯示符合篩選條件的不重複機台數量
---
### Requirement: OU% 趨勢折線圖
系統 SHALL 顯示 OU% 隨時間變化的折線圖。
#### Scenario: 顯示趨勢圖
- **WHEN** 查詢完成
- **THEN** 系統顯示 X 軸為時間、Y 軸為 OU% 的折線圖
- **THEN** X 軸根據時間粒度顯示對應的日期標籤
#### Scenario: 圖表互動
- **WHEN** 使用者將滑鼠移至圖表上的數據點
- **THEN** 系統顯示該時間點的詳細數值日期、OU%、PRD 時數等)
---
### Requirement: E10 狀態堆疊長條圖
系統 SHALL 顯示各時間點的 E10 設備狀態時數分布堆疊長條圖。
#### Scenario: 顯示堆疊圖
- **WHEN** 查詢完成
- **THEN** 系統顯示 X 軸為時間、Y 軸為時數的堆疊長條圖
- **THEN** 每個長條包含 PRD、SBY、UDT、SDT、EGT、NST 六種狀態的堆疊
#### Scenario: 圖表圖例
- **WHEN** 圖表顯示時
- **THEN** 系統顯示各狀態的顏色圖例
- **THEN** 使用者可點擊圖例切換該狀態的顯示/隱藏
---
### Requirement: 工站 OU% 對比水平條形圖
系統 SHALL 顯示各站點 OU% 的水平條形圖比較。
#### Scenario: 顯示對比圖
- **WHEN** 查詢完成
- **THEN** 系統顯示各站點的 OU% 水平條形圖
- **THEN** 條形圖按 OU% 由高到低排序
---
### Requirement: 設備狀態熱力圖
系統 SHALL 顯示站點 × 時間的 OU% 熱力圖。
#### Scenario: 顯示熱力圖
- **WHEN** 查詢完成
- **THEN** 系統顯示 X 軸為時間、Y 軸為站點的熱力圖
- **THEN** 顏色深淺表示該站點在該時間的 OU%
#### Scenario: 熱力圖顏色編碼
- **WHEN** 熱力圖顯示時
- **THEN** OU% 高(> 80%)顯示綠色
- **THEN** OU% 中50-80%)顯示黃色
- **THEN** OU% 低(< 50%顯示紅色
---
### Requirement: 階層式明細表格
系統 SHALL 顯示可展開的階層式明細表格支援站點 型號 個別機台三層結構
#### Scenario: 顯示站點層級
- **WHEN** 查詢完成
- **THEN** 表格預設顯示站點層級的彙總資料
- **THEN** 每列顯示展開/收合按鈕
#### Scenario: 展開至型號層級
- **WHEN** 使用者點擊站點列的展開按鈕
- **THEN** 系統顯示該站點下各型號的彙總資料
- **THEN** 型號列以縮排方式呈現
#### Scenario: 展開至機台層級
- **WHEN** 使用者點擊型號列的展開按鈕
- **THEN** 系統顯示該型號下各機台的詳細資料
- **THEN** 機台列以更深縮排方式呈現
#### Scenario: 全部展開
- **WHEN** 使用者點擊全部展開按鈕
- **THEN** 系統展開所有層級顯示完整明細
---
### Requirement: 表格欄位
系統 SHALL 在明細表格中顯示以下欄位站點/型號/機台OU%、PRD時數/佔比)、SBY時數/佔比)、UDT時數/佔比)、SDT時數/佔比)、EGT時數/佔比)、NST時數/佔比)、機台數
#### Scenario: 顯示時數與佔比
- **WHEN** 表格顯示資料時
- **THEN** E10 狀態欄位同時顯示時數小時和佔比百分比
- **THEN** 格式為123.4h (45.6%)」
#### Scenario: 機台數欄位
- **WHEN** 顯示站點或型號層級時
- **THEN** 機台數欄位顯示該群組的不重複機台數量
- **WHEN** 顯示機台層級時
- **THEN** 機台數欄位顯示 1
---
### Requirement: 資料匯出
系統 SHALL 提供 CSV 格式的資料匯出功能
#### Scenario: 匯出明細資料
- **WHEN** 使用者點擊匯出按鈕
- **THEN** 系統生成包含所有明細資料的 CSV 檔案
- **THEN** 瀏覽器自動下載該檔案
- **THEN** 檔案名稱包含查詢日期範圍
#### Scenario: 匯出大量資料
- **WHEN** 查詢結果超過 10000
- **THEN** 系統仍完整匯出所有資料
- **THEN** 系統顯示匯出進度提示

View File

@@ -0,0 +1,154 @@
## ADDED Requirements
### Requirement: 歷史資料摘要查詢
系統 SHALL 提供 API 端點 `GET /api/resource/history/summary` 查詢機台歷史效能摘要資料。
#### Scenario: 查詢日粒度摘要
- **WHEN** 呼叫 API 並傳入 start_date=2024-01-01, end_date=2024-01-07, granularity=day
- **THEN** 系統回傳該期間以日為單位聚合的 KPI、趨勢、熱力圖、站點比較資料
#### Scenario: 查詢週粒度摘要
- **WHEN** 呼叫 API 並傳入 granularity=week
- **THEN** 系統使用 TRUNC(TXNDATE, 'IW') 以 ISO 週為單位聚合資料
#### Scenario: 查詢月粒度摘要
- **WHEN** 呼叫 API 並傳入 granularity=month
- **THEN** 系統使用 TRUNC(TXNDATE, 'MM') 以月為單位聚合資料
#### Scenario: 查詢年粒度摘要
- **WHEN** 呼叫 API 並傳入 granularity=year
- **THEN** 系統使用 TRUNC(TXNDATE, 'YYYY') 以年為單位聚合資料
#### Scenario: 帶篩選條件查詢
- **WHEN** 呼叫 API 並傳入 workcenter=WC01, family=FAM01
- **THEN** 系統回傳僅符合該站點和型號的資料
#### Scenario: 回傳資料結構
- **WHEN** 查詢成功
- **THEN** 回傳 JSON 包含 kpi、trend、heatmap、workcenter_comparison 四個區塊
---
### Requirement: OU% 計算公式
系統 SHALL 使用標準 OU% 計算公式PRD / (PRD + SBY + EGT + SDT + UDT) * 100。
#### Scenario: 計算 OU%
- **WHEN** PRD=800, SBY=100, EGT=50, SDT=30, UDT=20
- **THEN** OU% = 800 / (800+100+50+30+20) * 100 = 80%
#### Scenario: 排除 NST
- **WHEN** 計算 OU% 時
- **THEN** 分母不包含 NSTNot Scheduled Time
#### Scenario: 處理零分母
- **WHEN** PRD + SBY + EGT + SDT + UDT = 0
- **THEN** OU% 回傳 0 而非錯誤
---
### Requirement: E10 狀態時數與佔比計算
系統 SHALL 計算各 E10 狀態PRD、SBY、UDT、SDT、EGT、NST的時數和佔比。
#### Scenario: 計算狀態時數
- **WHEN** 查詢特定期間的資料
- **THEN** 系統從 DW_MES_RESOURCESTATUS_SHIFT.HOURS 欄位聚合各狀態時數
#### Scenario: 計算狀態佔比
- **WHEN** 計算各狀態佔比
- **THEN** 佔比 = 該狀態時數 / 全部狀態時數總和 * 100
#### Scenario: 按狀態分組聚合
- **WHEN** 聚合資料時
- **THEN** 系統根據 OLDSTATUSNAME 欄位識別 PRD、SBY、UDT、SDT、EGT、NST
---
### Requirement: 階層式明細資料查詢
系統 SHALL 提供 API 端點 `GET /api/resource/history/detail` 查詢階層式明細資料。
#### Scenario: 查詢明細資料
- **WHEN** 呼叫 API 並傳入日期範圍和粒度
- **THEN** 系統回傳包含 WORKCENTERNAME、RESOURCEFAMILYNAME、RESOURCENAME 三個維度的扁平化資料
#### Scenario: 分頁查詢
- **WHEN** 呼叫 API 並傳入 page=2, page_size=100
- **THEN** 系統回傳第 101-200 筆資料
- **THEN** 回傳包含 total 總筆數供前端分頁
#### Scenario: 回傳欄位
- **WHEN** 查詢成功
- **THEN** 每筆資料包含workcenter、family、resource、ou_pct、prd_hours、prd_pct、sby_hours、sby_pct、udt_hours、udt_pct、sdt_hours、sdt_pct、egt_hours、egt_pct、nst_hours、nst_pct、machine_count
---
### Requirement: 篩選選項查詢
系統 SHALL 提供 API 端點查詢可用的篩選選項。
#### Scenario: 查詢站點列表
- **WHEN** 頁面載入時呼叫篩選選項 API
- **THEN** 系統回傳所有可用的 WORKCENTERNAME 列表
#### Scenario: 查詢型號列表
- **WHEN** 頁面載入時呼叫篩選選項 API
- **THEN** 系統回傳所有可用的 RESOURCEFAMILYNAME 列表
---
### Requirement: 資料匯出服務
系統 SHALL 提供 API 端點 `GET /api/resource/history/export` 匯出 CSV 格式資料。
#### Scenario: 匯出 CSV
- **WHEN** 呼叫 API 並傳入 format=csv 和篩選條件
- **THEN** 系統回傳 Content-Type: text/csv 的檔案下載
- **THEN** 檔案包含所有符合條件的明細資料
#### Scenario: CSV 欄位
- **WHEN** 匯出 CSV 時
- **THEN** 包含欄位站點、型號、機台、OU%、PRD(h)、PRD(%)、SBY(h)、SBY(%)、UDT(h)、UDT(%)、SDT(h)、SDT(%)、EGT(h)、EGT(%)、NST(h)、NST(%)
#### Scenario: 處理大量資料匯出
- **WHEN** 匯出資料量超過 10000 筆
- **THEN** 系統使用串流方式輸出避免記憶體溢出
---
### Requirement: 資料來源與關聯
系統 SHALL 從 DW_MES_RESOURCESTATUS_SHIFT 表查詢歷史狀態資料,並關聯 DW_MES_RESOURCE 表取得機台維度資訊。
#### Scenario: 資料表關聯
- **WHEN** 查詢資料時
- **THEN** 系統使用 HISTORYID = RESOURCEID 關聯兩表
#### Scenario: 篩選條件
- **WHEN** 查詢資料時
- **THEN** 系統套用 OBJECTCATEGORY/OBJECTTYPE 篩選ASSEMBLY 或 WAFERSORT
- **THEN** 系統排除 EXCLUDED_LOCATIONS 和 EXCLUDED_ASSET_STATUSES 中定義的資料
#### Scenario: 時間範圍篩選
- **WHEN** 查詢資料時
- **THEN** 系統使用 TXNDATE 欄位進行日期範圍篩選
---
### Requirement: 查詢效能優化
系統 SHALL 實作查詢效能優化措施。
#### Scenario: 日期範圍限制
- **WHEN** 查詢日期範圍超過 365 天
- **THEN** 系統回傳錯誤訊息「查詢範圍不可超過一年」
#### Scenario: 索引使用
- **WHEN** 執行查詢時
- **THEN** 系統確保 SQL 查詢能使用 TXNDATE 索引
#### Scenario: 查詢超時
- **WHEN** 查詢執行超過 60 秒
- **THEN** 系統中斷查詢並回傳超時錯誤

View File

@@ -0,0 +1,68 @@
## 1. 後端服務模組
- [x] 1.1 建立 `src/mes_dashboard/services/resource_history_service.py` 模組結構
- [x] 1.2 實作 `get_filter_options()` 函數:查詢可用的站點、型號列表
- [x] 1.3 實作 `query_summary()` 函數:查詢 KPI、趨勢、熱力圖、站點比較資料
- [x] 1.4 實作 `query_detail()` 函數:查詢階層式明細資料(支援分頁)
- [x] 1.5 實作 `export_csv()` 函數:串流輸出 CSV 格式資料
- [x] 1.6 實作時間粒度聚合邏輯(日/週/月/年 TRUNC 函數)
- [x] 1.7 實作 OU% 計算公式PRD / (PRD+SBY+EGT+SDT+UDT) * 100
- [x] 1.8 實作各 E10 狀態時數與佔比計算
## 2. API 路由
- [x] 2.1 建立 `src/mes_dashboard/routes/resource_history_routes.py` 路由模組
- [x] 2.2 實作 `GET /api/resource/history/options` 端點:篩選選項
- [x] 2.3 實作 `GET /api/resource/history/summary` 端點:摘要資料
- [x] 2.4 實作 `GET /api/resource/history/detail` 端點:明細資料
- [x] 2.5 實作 `GET /api/resource/history/export` 端點CSV 匯出
- [x] 2.6 在 `__init__.py` 註冊新路由 Blueprint
## 3. 前端頁面模板
- [x] 3.1 建立 `src/mes_dashboard/templates/resource_history.html` 頁面模板
- [x] 3.2 實作篩選條件區(日期範圍、粒度按鈕、站點/型號下拉、checkbox
- [x] 3.3 實作 KPI 摘要卡片區OU%、PRD、UDT、SDT、EGT、機台數
- [x] 3.4 實作頁面路由 `GET /resource-history`
## 4. 前端圖表
- [x] 4.1 實作 OU% 趨勢折線圖ECharts line chart
- [x] 4.2 實作 E10 狀態堆疊長條圖ECharts stacked bar chart
- [x] 4.3 實作工站 OU% 對比水平條形圖ECharts horizontal bar
- [x] 4.4 實作設備狀態熱力圖ECharts heatmap
## 5. 前端表格
- [x] 5.1 實作階層式明細表格結構
- [x] 5.2 實作站點層級展開/收合功能
- [x] 5.3 實作型號層級展開/收合功能
- [x] 5.4 實作「全部展開」按鈕
- [x] 5.5 實作表格欄位格式化(時數/佔比顯示)
- [x] 5.6 實作匯出按鈕與 CSV 下載
## 6. 前端互動邏輯
- [x] 6.1 實作查詢按鈕點擊事件與載入指示器
- [x] 6.2 實作日期範圍驗證(不超過 730 天 / 兩年)
- [x] 6.3 實作時間粒度切換邏輯
- [x] 6.4 實作篩選條件變更處理
- [x] 6.5 實作查詢失敗的錯誤處理與 toast 通知
- [x] 6.6 實作初始狀態提示(「請設定查詢條件」)
## 7. 測試與驗證
- [x] 7.1 驗證 API 回傳資料格式正確
- [x] 7.2 驗證 OU% 計算結果正確
- [x] 7.3 驗證各時間粒度聚合正確
- [x] 7.4 驗證階層式表格展開/收合正常
- [x] 7.5 驗證 CSV 匯出內容正確
- [x] 7.6 驗證大量資料查詢效能(日期範圍限制生效)
## 8. 後續優化
- [x] 8.1 放寬查詢日期範圍至 730 天(兩年)
- [x] 8.2 移除明細查詢筆數上限(原 5000 筆)
- [x] 8.3 修復 NaN 值造成 JSON 序列化錯誤
- [x] 8.4 修復 MesApi 成功回應後誤觸重試機制
- [x] 8.5 E10 狀態分布圖表 tooltip 加入百分比顯示

View File

@@ -175,6 +175,11 @@ def create_app(config_name: str | None = None) -> Flask:
"""Excel batch query tool page."""
return render_template('excel_query.html')
@app.route('/resource-history')
def resource_history_page():
"""Resource history analysis page."""
return render_template('resource_history.html')
# ========================================================
# Table Query APIs (for table_data_viewer)
# ========================================================

View File

@@ -11,6 +11,7 @@ from .excel_query_routes import excel_query_bp
from .hold_routes import hold_bp
from .auth_routes import auth_bp
from .admin_routes import admin_bp
from .resource_history_routes import resource_history_bp
def register_routes(app) -> None:
@@ -20,6 +21,7 @@ def register_routes(app) -> None:
app.register_blueprint(dashboard_bp)
app.register_blueprint(excel_query_bp)
app.register_blueprint(hold_bp)
app.register_blueprint(resource_history_bp)
__all__ = [
'wip_bp',
@@ -29,5 +31,6 @@ __all__ = [
'hold_bp',
'auth_bp',
'admin_bp',
'resource_history_bp',
'register_routes',
]

View File

@@ -0,0 +1,239 @@
# -*- coding: utf-8 -*-
"""Resource History Analysis API routes.
Contains Flask Blueprint for historical equipment performance analysis endpoints.
"""
from flask import Blueprint, jsonify, request, render_template, Response
from mes_dashboard.core.cache import cache_get, cache_set, make_cache_key
from mes_dashboard.config.constants import CACHE_TTL_FILTER_OPTIONS, CACHE_TTL_TREND
from mes_dashboard.services.resource_history_service import (
get_filter_options,
query_summary,
query_detail,
export_csv,
)
# Create Blueprint
resource_history_bp = Blueprint(
'resource_history',
__name__,
url_prefix='/api/resource/history'
)
# ============================================================
# Page Route (for template rendering)
# ============================================================
@resource_history_bp.route('/page', methods=['GET'], endpoint='page_alias')
def api_resource_history_page():
"""Render the resource history analysis page.
Note: The actual page route /resource-history is registered separately
in the main app initialization.
"""
return render_template('resource_history.html')
# ============================================================
# API Endpoints
# ============================================================
@resource_history_bp.route('/options', methods=['GET'])
def api_resource_history_options():
"""API: Get filter options (workcenters and families).
Returns:
JSON with workcenters and families lists.
"""
cache_key = make_cache_key("resource_history_options")
options = cache_get(cache_key)
if options is None:
options = get_filter_options()
if options is not None:
cache_set(cache_key, options, ttl=CACHE_TTL_FILTER_OPTIONS)
if options is not None:
return jsonify({'success': True, 'data': options})
return jsonify({'success': False, 'error': '查詢篩選選項失敗'}), 500
@resource_history_bp.route('/summary', methods=['GET'])
def api_resource_history_summary():
"""API: Get summary data (KPI, trend, heatmap, workcenter comparison).
Query Parameters:
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
granularity: day|week|month|year (default: day)
workcenter_groups: Optional workcenter group filter (multi-select)
families: Optional resource family filter (multi-select)
is_production: 1 to filter production equipment
is_key: 1 to filter key equipment
is_monitor: 1 to filter monitored equipment
Returns:
JSON with kpi, trend, heatmap, workcenter_comparison sections.
"""
# Parse query parameters
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
granularity = request.args.get('granularity', 'day')
workcenter_groups = request.args.getlist('workcenter_groups') or None
families = request.args.getlist('families') or None
is_production = request.args.get('is_production') == '1'
is_key = request.args.get('is_key') == '1'
is_monitor = request.args.get('is_monitor') == '1'
# Validate required parameters
if not start_date or not end_date:
return jsonify({
'success': False,
'error': '必須提供 start_date 和 end_date 參數'
}), 400
# Build cache key with filters dict
cache_filters = {
'start_date': start_date,
'end_date': end_date,
'granularity': granularity,
'workcenter_groups': sorted(workcenter_groups) if workcenter_groups else None,
'families': sorted(families) if families else None,
'is_production': is_production,
'is_key': is_key,
'is_monitor': is_monitor,
}
cache_key = make_cache_key("resource_history_summary", filters=cache_filters)
result = cache_get(cache_key)
if result is None:
result = query_summary(
start_date=start_date,
end_date=end_date,
granularity=granularity,
workcenter_groups=workcenter_groups,
families=families,
is_production=is_production,
is_key=is_key,
is_monitor=is_monitor,
)
if result is not None and 'error' not in result:
cache_set(cache_key, result, ttl=CACHE_TTL_TREND)
if result is not None:
if 'error' in result:
return jsonify({'success': False, 'error': result['error']}), 400
return jsonify({'success': True, 'data': result})
return jsonify({'success': False, 'error': '查詢摘要資料失敗'}), 500
@resource_history_bp.route('/detail', methods=['GET'])
def api_resource_history_detail():
"""API: Get hierarchical detail data.
Query Parameters:
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
granularity: day|week|month|year (default: day)
workcenter_groups: Optional workcenter group filter (multi-select)
families: Optional resource family filter (multi-select)
is_production: 1 to filter production equipment
is_key: 1 to filter key equipment
is_monitor: 1 to filter monitored equipment
Returns:
JSON with data array, total count, truncated flag.
"""
# Parse query parameters
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
granularity = request.args.get('granularity', 'day')
workcenter_groups = request.args.getlist('workcenter_groups') or None
families = request.args.getlist('families') or None
is_production = request.args.get('is_production') == '1'
is_key = request.args.get('is_key') == '1'
is_monitor = request.args.get('is_monitor') == '1'
# Validate required parameters
if not start_date or not end_date:
return jsonify({
'success': False,
'error': '必須提供 start_date 和 end_date 參數'
}), 400
result = query_detail(
start_date=start_date,
end_date=end_date,
granularity=granularity,
workcenter_groups=workcenter_groups,
families=families,
is_production=is_production,
is_key=is_key,
is_monitor=is_monitor,
)
if result is not None:
if 'error' in result:
return jsonify({'success': False, 'error': result['error']}), 400
return jsonify({'success': True, **result})
return jsonify({'success': False, 'error': '查詢明細資料失敗'}), 500
@resource_history_bp.route('/export', methods=['GET'])
def api_resource_history_export():
"""API: Export detail data as CSV.
Query Parameters:
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
granularity: day|week|month|year (default: day)
workcenter_groups: Optional workcenter group filter (multi-select)
families: Optional resource family filter (multi-select)
is_production: 1 to filter production equipment
is_key: 1 to filter key equipment
is_monitor: 1 to filter monitored equipment
Returns:
CSV file download.
"""
# Parse query parameters
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
granularity = request.args.get('granularity', 'day')
workcenter_groups = request.args.getlist('workcenter_groups') or None
families = request.args.getlist('families') or None
is_production = request.args.get('is_production') == '1'
is_key = request.args.get('is_key') == '1'
is_monitor = request.args.get('is_monitor') == '1'
# Validate required parameters
if not start_date or not end_date:
return jsonify({
'success': False,
'error': '必須提供 start_date 和 end_date 參數'
}), 400
# Generate filename
filename = f"resource_history_{start_date}_to_{end_date}.csv"
# Stream CSV response
return Response(
export_csv(
start_date=start_date,
end_date=end_date,
granularity=granularity,
workcenter_groups=workcenter_groups,
families=families,
is_production=is_production,
is_key=is_key,
is_monitor=is_monitor,
),
mimetype='text/csv',
headers={
'Content-Disposition': f'attachment; filename={filename}',
'Content-Type': 'text/csv; charset=utf-8-sig'
}
)

View File

@@ -0,0 +1,292 @@
# -*- coding: utf-8 -*-
"""Cached filter options for MES Dashboard.
Provides cached workcenter groups and resource families for filter dropdowns.
Data is loaded from database and cached in memory with periodic refresh.
"""
import logging
import threading
from datetime import datetime, timedelta
from typing import Optional, Dict, List, Any
from mes_dashboard.core.database import read_sql_df
from mes_dashboard.config.constants import (
EXCLUDED_LOCATIONS,
EXCLUDED_ASSET_STATUSES,
EQUIPMENT_TYPE_FILTER,
)
logger = logging.getLogger('mes_dashboard.filter_cache')
# ============================================================
# Cache Configuration
# ============================================================
CACHE_TTL_SECONDS = 3600 # 1 hour cache TTL
WIP_VIEW = "DWH.DW_PJ_LOT_V"
# ============================================================
# Cache Storage
# ============================================================
_CACHE = {
'workcenter_groups': None, # List of {name, sequence}
'workcenter_mapping': None, # Dict {workcentername: {group, sequence}}
'resource_families': None, # List of family names
'last_refresh': None,
'is_loading': False,
}
_CACHE_LOCK = threading.Lock()
# ============================================================
# Workcenter Group Functions
# ============================================================
def get_workcenter_groups(force_refresh: bool = False) -> Optional[List[Dict[str, Any]]]:
"""Get list of workcenter groups with sequence order.
Returns:
List of {name, sequence} sorted by sequence, or None if loading fails.
"""
_ensure_cache_loaded(force_refresh)
return _CACHE.get('workcenter_groups')
def get_workcenter_mapping(force_refresh: bool = False) -> Optional[Dict[str, Dict[str, Any]]]:
"""Get workcenter name to group mapping.
Returns:
Dict mapping workcentername to {group, sequence}, or None if loading fails.
"""
_ensure_cache_loaded(force_refresh)
return _CACHE.get('workcenter_mapping')
def get_workcenters_for_groups(groups: List[str]) -> List[str]:
"""Get list of workcenter names that belong to specified groups.
Args:
groups: List of WORKCENTER_GROUP names
Returns:
List of WORKCENTERNAME values belonging to those groups
"""
mapping = get_workcenter_mapping()
if not mapping:
return []
result = []
for wc_name, info in mapping.items():
if info.get('group') in groups:
result.append(wc_name)
return result
# ============================================================
# Resource Family Functions
# ============================================================
def get_resource_families(force_refresh: bool = False) -> Optional[List[str]]:
"""Get list of resource family names.
Returns:
Sorted list of RESOURCEFAMILYNAME values, or None if loading fails.
"""
_ensure_cache_loaded(force_refresh)
return _CACHE.get('resource_families')
# ============================================================
# Cache Management
# ============================================================
def get_cache_status() -> Dict[str, Any]:
"""Get current cache status.
Returns:
Dict with cache status information
"""
with _CACHE_LOCK:
last_refresh = _CACHE.get('last_refresh')
return {
'loaded': last_refresh is not None,
'last_refresh': last_refresh.isoformat() if last_refresh else None,
'is_loading': _CACHE.get('is_loading', False),
'workcenter_groups_count': len(_CACHE.get('workcenter_groups') or []),
'workcenter_mapping_count': len(_CACHE.get('workcenter_mapping') or {}),
'resource_families_count': len(_CACHE.get('resource_families') or []),
}
def refresh_cache() -> bool:
"""Force refresh the cache.
Returns:
True if refresh succeeded, False otherwise
"""
return _load_cache()
def _ensure_cache_loaded(force_refresh: bool = False):
"""Ensure cache is loaded and not stale."""
with _CACHE_LOCK:
now = datetime.now()
last_refresh = _CACHE.get('last_refresh')
is_loading = _CACHE.get('is_loading', False)
# Check if cache is valid
cache_valid = (
last_refresh is not None and
(now - last_refresh).total_seconds() < CACHE_TTL_SECONDS
)
if cache_valid and not force_refresh:
return
if is_loading:
return # Another thread is loading
# Load cache (outside lock to avoid blocking)
_load_cache()
def _load_cache() -> bool:
"""Load all cache data from database.
Returns:
True if loading succeeded, False otherwise
"""
with _CACHE_LOCK:
if _CACHE.get('is_loading'):
return False
_CACHE['is_loading'] = True
try:
# Load workcenter groups from DW_PJ_LOT_V
wc_groups, wc_mapping = _load_workcenter_data()
# Load resource families from DW_MES_RESOURCE
families = _load_resource_families()
with _CACHE_LOCK:
_CACHE['workcenter_groups'] = wc_groups
_CACHE['workcenter_mapping'] = wc_mapping
_CACHE['resource_families'] = families
_CACHE['last_refresh'] = datetime.now()
_CACHE['is_loading'] = False
logger.info(
f"Filter cache refreshed: {len(wc_groups or [])} groups, "
f"{len(wc_mapping or {})} workcenters, {len(families or [])} families"
)
return True
except Exception as exc:
logger.error(f"Failed to load filter cache: {exc}")
with _CACHE_LOCK:
_CACHE['is_loading'] = False
return False
def _load_workcenter_data():
"""Load workcenter group data from DW_PJ_LOT_V.
Returns:
Tuple of (groups_list, mapping_dict)
"""
try:
sql = f"""
SELECT DISTINCT
WORKCENTERNAME,
WORKCENTERID,
WORKCENTER_GROUP,
WORKCENTERSEQUENCE_GROUP
FROM {WIP_VIEW}
WHERE WORKCENTER_GROUP IS NOT NULL
AND WORKCENTERNAME IS NOT NULL
"""
df = read_sql_df(sql)
if df is None or df.empty:
logger.warning("No workcenter data found in DW_PJ_LOT_V")
return [], {}
# Build groups list (unique groups, take minimum sequence for each group)
groups_df = df.groupby('WORKCENTER_GROUP')['WORKCENTERSEQUENCE_GROUP'].min().reset_index()
groups_df = groups_df.sort_values('WORKCENTERSEQUENCE_GROUP')
groups = []
for _, row in groups_df.iterrows():
groups.append({
'name': row['WORKCENTER_GROUP'],
'sequence': int(row['WORKCENTERSEQUENCE_GROUP'] or 999)
})
# Build mapping dict
mapping = {}
for _, row in df.iterrows():
wc_name = row['WORKCENTERNAME']
mapping[wc_name] = {
'id': row.get('WORKCENTERID'),
'group': row['WORKCENTER_GROUP'],
'sequence': int(row['WORKCENTERSEQUENCE_GROUP'] or 999)
}
return groups, mapping
except Exception as exc:
logger.error(f"Failed to load workcenter data: {exc}")
return [], {}
def _load_resource_families():
"""Load resource family data from DW_MES_RESOURCE.
Returns:
Sorted list of family names
"""
try:
# Build exclusion filters (note: column name is LOCATIONNAME, not LOCATION)
location_list = ", ".join(f"'{loc}'" for loc in EXCLUDED_LOCATIONS)
location_filter = f"AND (r.LOCATIONNAME IS NULL OR r.LOCATIONNAME NOT IN ({location_list}))" if EXCLUDED_LOCATIONS else ""
# Note: Column name is PJ_ASSETSSTATUS (double S), not ASSETSTATUS
status_list = ", ".join(f"'{s}'" for s in EXCLUDED_ASSET_STATUSES)
asset_status_filter = f"AND r.PJ_ASSETSSTATUS NOT IN ({status_list})" if EXCLUDED_ASSET_STATUSES else ""
sql = f"""
SELECT DISTINCT RESOURCEFAMILYNAME
FROM DW_MES_RESOURCE r
WHERE {EQUIPMENT_TYPE_FILTER}
{location_filter}
{asset_status_filter}
AND RESOURCEFAMILYNAME IS NOT NULL
"""
df = read_sql_df(sql)
if df is None or df.empty:
logger.warning("No resource family data found")
return []
families = df['RESOURCEFAMILYNAME'].dropna().unique().tolist()
return sorted(families)
except Exception as exc:
logger.error(f"Failed to load resource families: {exc}")
return []
# ============================================================
# Initialization
# ============================================================
def init_cache():
"""Initialize the cache on application startup.
Should be called during app initialization.
"""
logger.info("Initializing filter cache...")
_load_cache()

View File

@@ -0,0 +1,844 @@
# -*- coding: utf-8 -*-
"""Resource History Analysis Service.
Provides functions for querying historical equipment performance data including:
- Filter options (workcenters, families)
- Summary data (KPI, trend, heatmap, workcenter comparison)
- Hierarchical detail data (workcenter → family → resource)
- CSV export with streaming
"""
import io
import csv
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timedelta
from typing import Optional, Dict, List, Any, Generator
import pandas as pd
from mes_dashboard.core.database import read_sql_df
from mes_dashboard.config.constants import (
EXCLUDED_LOCATIONS,
EXCLUDED_ASSET_STATUSES,
EQUIPMENT_TYPE_FILTER,
EQUIPMENT_FLAG_FILTERS,
)
logger = logging.getLogger('mes_dashboard.resource_history')
# Maximum allowed query range in days
MAX_QUERY_DAYS = 730
# E10 Status definitions
E10_STATUSES = ['PRD', 'SBY', 'UDT', 'SDT', 'EGT', 'NST']
# ============================================================
# Filter Options
# ============================================================
def get_filter_options() -> Optional[Dict[str, Any]]:
"""Get filter options from cache.
Uses cached workcenter groups from DW_PJ_LOT_V and resource families from DW_MES_RESOURCE.
Returns:
Dict with:
- 'workcenter_groups': List of {name, sequence} sorted by sequence
- 'families': List of family names sorted alphabetically
Or None if cache loading fails.
"""
from mes_dashboard.services.filter_cache import (
get_workcenter_groups,
get_resource_families,
)
try:
groups = get_workcenter_groups()
families = get_resource_families()
if groups is None or families is None:
logger.error("Filter cache not available")
return None
return {
'workcenter_groups': groups,
'families': families
}
except Exception as exc:
logger.error(f"Filter options query failed: {exc}")
return None
# ============================================================
# Summary Query
# ============================================================
def query_summary(
start_date: str,
end_date: str,
granularity: str = 'day',
workcenter_groups: Optional[List[str]] = None,
families: Optional[List[str]] = None,
is_production: bool = False,
is_key: bool = False,
is_monitor: bool = False,
) -> Optional[Dict[str, Any]]:
"""Query summary data including KPI, trend, heatmap, and workcenter comparison.
Args:
start_date: Start date in YYYY-MM-DD format
end_date: End date in YYYY-MM-DD format
granularity: Time granularity ('day', 'week', 'month', 'year')
workcenter_groups: Optional list of WORKCENTER_GROUP names to filter
families: Optional list of RESOURCEFAMILYNAME values to filter
is_production: Filter by production flag
is_key: Filter by key equipment flag
is_monitor: Filter by monitor flag
Returns:
Dict with 'kpi', 'trend', 'heatmap', 'workcenter_comparison' sections,
or None if query fails.
"""
# Validate date range
validation = _validate_date_range(start_date, end_date)
if validation:
return {'error': validation}
try:
# Build SQL components
date_trunc = _get_date_trunc(granularity)
location_filter = _build_location_filter('r')
asset_status_filter = _build_asset_status_filter('r')
equipment_filter = _build_equipment_flags_filter(is_production, is_key, is_monitor, 'r')
workcenter_filter = _build_workcenter_groups_filter(workcenter_groups, 'r')
family_filter = _build_families_filter(families, 'r')
# Common CTE with MATERIALIZE hint to force Oracle to materialize the subquery
# This prevents the optimizer from inlining the CTE multiple times
base_cte = f"""
WITH shift_data AS (
SELECT /*+ MATERIALIZE */ HISTORYID, TXNDATE, OLDSTATUSNAME, HOURS
FROM DW_MES_RESOURCESTATUS_SHIFT
WHERE TXNDATE >= TO_DATE('{start_date}', 'YYYY-MM-DD')
AND TXNDATE < TO_DATE('{end_date}', 'YYYY-MM-DD') + 1
)
"""
# Common filter conditions
common_filters = f"""
WHERE {EQUIPMENT_TYPE_FILTER}
{location_filter}
{asset_status_filter}
{equipment_filter}
{workcenter_filter}
{family_filter}
"""
# Build all 4 SQL queries
kpi_sql = f"""
{base_cte}
SELECT
SUM(CASE WHEN ss.OLDSTATUSNAME = 'PRD' THEN ss.HOURS ELSE 0 END) as PRD_HOURS,
SUM(CASE WHEN ss.OLDSTATUSNAME = 'SBY' THEN ss.HOURS ELSE 0 END) as SBY_HOURS,
SUM(CASE WHEN ss.OLDSTATUSNAME = 'UDT' THEN ss.HOURS ELSE 0 END) as UDT_HOURS,
SUM(CASE WHEN ss.OLDSTATUSNAME = 'SDT' THEN ss.HOURS ELSE 0 END) as SDT_HOURS,
SUM(CASE WHEN ss.OLDSTATUSNAME = 'EGT' THEN ss.HOURS ELSE 0 END) as EGT_HOURS,
SUM(CASE WHEN ss.OLDSTATUSNAME = 'NST' THEN ss.HOURS ELSE 0 END) as NST_HOURS,
COUNT(DISTINCT ss.HISTORYID) as MACHINE_COUNT
FROM shift_data ss
JOIN DW_MES_RESOURCE r ON ss.HISTORYID = r.RESOURCEID
{common_filters}
"""
trend_sql = f"""
{base_cte}
SELECT
{date_trunc} as DATA_DATE,
SUM(CASE WHEN ss.OLDSTATUSNAME = 'PRD' THEN ss.HOURS ELSE 0 END) as PRD_HOURS,
SUM(CASE WHEN ss.OLDSTATUSNAME = 'SBY' THEN ss.HOURS ELSE 0 END) as SBY_HOURS,
SUM(CASE WHEN ss.OLDSTATUSNAME = 'UDT' THEN ss.HOURS ELSE 0 END) as UDT_HOURS,
SUM(CASE WHEN ss.OLDSTATUSNAME = 'SDT' THEN ss.HOURS ELSE 0 END) as SDT_HOURS,
SUM(CASE WHEN ss.OLDSTATUSNAME = 'EGT' THEN ss.HOURS ELSE 0 END) as EGT_HOURS,
SUM(CASE WHEN ss.OLDSTATUSNAME = 'NST' THEN ss.HOURS ELSE 0 END) as NST_HOURS,
COUNT(DISTINCT ss.HISTORYID) as MACHINE_COUNT
FROM shift_data ss
JOIN DW_MES_RESOURCE r ON ss.HISTORYID = r.RESOURCEID
{common_filters}
GROUP BY {date_trunc}
ORDER BY DATA_DATE
"""
heatmap_sql = f"""
{base_cte}
SELECT
r.WORKCENTERNAME,
{date_trunc} as DATA_DATE,
SUM(CASE WHEN ss.OLDSTATUSNAME = 'PRD' THEN ss.HOURS ELSE 0 END) as PRD_HOURS,
SUM(CASE WHEN ss.OLDSTATUSNAME = 'SBY' THEN ss.HOURS ELSE 0 END) as SBY_HOURS,
SUM(CASE WHEN ss.OLDSTATUSNAME = 'UDT' THEN ss.HOURS ELSE 0 END) as UDT_HOURS,
SUM(CASE WHEN ss.OLDSTATUSNAME = 'SDT' THEN ss.HOURS ELSE 0 END) as SDT_HOURS,
SUM(CASE WHEN ss.OLDSTATUSNAME = 'EGT' THEN ss.HOURS ELSE 0 END) as EGT_HOURS
FROM shift_data ss
JOIN DW_MES_RESOURCE r ON ss.HISTORYID = r.RESOURCEID
WHERE r.WORKCENTERNAME IS NOT NULL
AND {EQUIPMENT_TYPE_FILTER}
{location_filter}
{asset_status_filter}
{equipment_filter}
{workcenter_filter}
{family_filter}
GROUP BY r.WORKCENTERNAME, {date_trunc}
ORDER BY r.WORKCENTERNAME, DATA_DATE
"""
comparison_sql = f"""
{base_cte}
SELECT
r.WORKCENTERNAME,
SUM(CASE WHEN ss.OLDSTATUSNAME = 'PRD' THEN ss.HOURS ELSE 0 END) as PRD_HOURS,
SUM(CASE WHEN ss.OLDSTATUSNAME = 'SBY' THEN ss.HOURS ELSE 0 END) as SBY_HOURS,
SUM(CASE WHEN ss.OLDSTATUSNAME = 'UDT' THEN ss.HOURS ELSE 0 END) as UDT_HOURS,
SUM(CASE WHEN ss.OLDSTATUSNAME = 'SDT' THEN ss.HOURS ELSE 0 END) as SDT_HOURS,
SUM(CASE WHEN ss.OLDSTATUSNAME = 'EGT' THEN ss.HOURS ELSE 0 END) as EGT_HOURS,
COUNT(DISTINCT ss.HISTORYID) as MACHINE_COUNT
FROM shift_data ss
JOIN DW_MES_RESOURCE r ON ss.HISTORYID = r.RESOURCEID
WHERE r.WORKCENTERNAME IS NOT NULL
AND {EQUIPMENT_TYPE_FILTER}
{location_filter}
{asset_status_filter}
{equipment_filter}
{workcenter_filter}
{family_filter}
GROUP BY r.WORKCENTERNAME
ORDER BY PRD_HOURS DESC
"""
# Execute all 4 queries in parallel using ThreadPoolExecutor
results = {}
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {
executor.submit(read_sql_df, kpi_sql): 'kpi',
executor.submit(read_sql_df, trend_sql): 'trend',
executor.submit(read_sql_df, heatmap_sql): 'heatmap',
executor.submit(read_sql_df, comparison_sql): 'comparison',
}
for future in as_completed(futures):
query_name = futures[future]
try:
results[query_name] = future.result()
except Exception as exc:
logger.error(f"{query_name} query failed: {exc}")
results[query_name] = pd.DataFrame()
# Build response from results
kpi = _build_kpi_from_df(results.get('kpi', pd.DataFrame()))
trend = _build_trend_from_df(results.get('trend', pd.DataFrame()), granularity)
heatmap = _build_heatmap_from_df(results.get('heatmap', pd.DataFrame()), granularity)
workcenter_comparison = _build_comparison_from_df(results.get('comparison', pd.DataFrame()))
return {
'kpi': kpi,
'trend': trend,
'heatmap': heatmap,
'workcenter_comparison': workcenter_comparison
}
except Exception as exc:
logger.error(f"Summary query failed: {exc}")
import traceback
traceback.print_exc()
return None
# ============================================================
# Detail Query
# ============================================================
# Maximum records limit for detail query (disabled - no limit)
# MAX_DETAIL_RECORDS = 5000
def query_detail(
start_date: str,
end_date: str,
granularity: str = 'day',
workcenter_groups: Optional[List[str]] = None,
families: Optional[List[str]] = None,
is_production: bool = False,
is_key: bool = False,
is_monitor: bool = False,
) -> Optional[Dict[str, Any]]:
"""Query hierarchical detail data.
Returns flat data with workcenter, family, resource dimensions.
Frontend handles hierarchy assembly.
Args:
start_date: Start date in YYYY-MM-DD format
end_date: End date in YYYY-MM-DD format
granularity: Time granularity ('day', 'week', 'month', 'year')
workcenter_groups: Optional list of WORKCENTER_GROUP names to filter
families: Optional list of RESOURCEFAMILYNAME values to filter
is_production: Filter by production flag
is_key: Filter by key equipment flag
is_monitor: Filter by monitor flag
Returns:
Dict with 'data', 'total', 'truncated' fields,
or None if query fails.
"""
# Validate date range
validation = _validate_date_range(start_date, end_date)
if validation:
return {'error': validation}
try:
# Build SQL components
location_filter = _build_location_filter('r')
asset_status_filter = _build_asset_status_filter('r')
equipment_filter = _build_equipment_flags_filter(is_production, is_key, is_monitor, 'r')
workcenter_filter = _build_workcenter_groups_filter(workcenter_groups, 'r')
family_filter = _build_families_filter(families, 'r')
# Common CTE with MATERIALIZE hint
base_cte = f"""
WITH shift_data AS (
SELECT /*+ MATERIALIZE */ HISTORYID, OLDSTATUSNAME, HOURS
FROM DW_MES_RESOURCESTATUS_SHIFT
WHERE TXNDATE >= TO_DATE('{start_date}', 'YYYY-MM-DD')
AND TXNDATE < TO_DATE('{end_date}', 'YYYY-MM-DD') + 1
)
"""
# Common filter conditions
common_filters = f"""
WHERE {EQUIPMENT_TYPE_FILTER}
{location_filter}
{asset_status_filter}
{equipment_filter}
{workcenter_filter}
{family_filter}
"""
# Query all detail data (no pagination)
detail_sql = f"""
{base_cte}
SELECT
r.WORKCENTERNAME,
r.RESOURCEFAMILYNAME,
r.RESOURCENAME,
SUM(CASE WHEN ss.OLDSTATUSNAME = 'PRD' THEN ss.HOURS ELSE 0 END) as PRD_HOURS,
SUM(CASE WHEN ss.OLDSTATUSNAME = 'SBY' THEN ss.HOURS ELSE 0 END) as SBY_HOURS,
SUM(CASE WHEN ss.OLDSTATUSNAME = 'UDT' THEN ss.HOURS ELSE 0 END) as UDT_HOURS,
SUM(CASE WHEN ss.OLDSTATUSNAME = 'SDT' THEN ss.HOURS ELSE 0 END) as SDT_HOURS,
SUM(CASE WHEN ss.OLDSTATUSNAME = 'EGT' THEN ss.HOURS ELSE 0 END) as EGT_HOURS,
SUM(CASE WHEN ss.OLDSTATUSNAME = 'NST' THEN ss.HOURS ELSE 0 END) as NST_HOURS,
SUM(ss.HOURS) as TOTAL_HOURS
FROM shift_data ss
JOIN DW_MES_RESOURCE r ON ss.HISTORYID = r.RESOURCEID
{common_filters}
GROUP BY r.WORKCENTERNAME, r.RESOURCEFAMILYNAME, r.RESOURCENAME
ORDER BY r.WORKCENTERNAME, r.RESOURCEFAMILYNAME, r.RESOURCENAME
"""
detail_df = read_sql_df(detail_sql)
total = len(detail_df) if detail_df is not None else 0
data = _build_detail_from_df(detail_df)
return {
'data': data,
'total': total,
'truncated': False,
'max_records': None
}
except Exception as exc:
logger.error(f"Detail query failed: {exc}")
import traceback
traceback.print_exc()
return None
# ============================================================
# CSV Export
# ============================================================
def export_csv(
start_date: str,
end_date: str,
granularity: str = 'day',
workcenter_groups: Optional[List[str]] = None,
families: Optional[List[str]] = None,
is_production: bool = False,
is_key: bool = False,
is_monitor: bool = False,
) -> Generator[str, None, None]:
"""Generate CSV data as a stream for export.
Yields CSV rows one at a time to avoid memory issues with large datasets.
Args:
start_date: Start date in YYYY-MM-DD format
end_date: End date in YYYY-MM-DD format
granularity: Time granularity
workcenter_groups: Optional list of WORKCENTER_GROUP names to filter
families: Optional list of RESOURCEFAMILYNAME values to filter
is_production: Filter by production flag
is_key: Filter by key equipment flag
is_monitor: Filter by monitor flag
Yields:
CSV rows as strings
"""
# Validate date range
validation = _validate_date_range(start_date, end_date)
if validation:
yield f"Error: {validation}\n"
return
try:
# Build SQL components
location_filter = _build_location_filter('r')
asset_status_filter = _build_asset_status_filter('r')
equipment_filter = _build_equipment_flags_filter(is_production, is_key, is_monitor, 'r')
workcenter_filter = _build_workcenter_groups_filter(workcenter_groups, 'r')
family_filter = _build_families_filter(families, 'r')
# Query all data with CTE and MATERIALIZE hint for performance optimization
sql = f"""
WITH shift_data AS (
SELECT /*+ MATERIALIZE */ HISTORYID, OLDSTATUSNAME, HOURS
FROM DW_MES_RESOURCESTATUS_SHIFT
WHERE TXNDATE >= TO_DATE('{start_date}', 'YYYY-MM-DD')
AND TXNDATE < TO_DATE('{end_date}', 'YYYY-MM-DD') + 1
)
SELECT
r.WORKCENTERNAME,
r.RESOURCEFAMILYNAME,
r.RESOURCENAME,
SUM(CASE WHEN ss.OLDSTATUSNAME = 'PRD' THEN ss.HOURS ELSE 0 END) as PRD_HOURS,
SUM(CASE WHEN ss.OLDSTATUSNAME = 'SBY' THEN ss.HOURS ELSE 0 END) as SBY_HOURS,
SUM(CASE WHEN ss.OLDSTATUSNAME = 'UDT' THEN ss.HOURS ELSE 0 END) as UDT_HOURS,
SUM(CASE WHEN ss.OLDSTATUSNAME = 'SDT' THEN ss.HOURS ELSE 0 END) as SDT_HOURS,
SUM(CASE WHEN ss.OLDSTATUSNAME = 'EGT' THEN ss.HOURS ELSE 0 END) as EGT_HOURS,
SUM(CASE WHEN ss.OLDSTATUSNAME = 'NST' THEN ss.HOURS ELSE 0 END) as NST_HOURS,
SUM(ss.HOURS) as TOTAL_HOURS
FROM shift_data ss
JOIN DW_MES_RESOURCE r ON ss.HISTORYID = r.RESOURCEID
WHERE {EQUIPMENT_TYPE_FILTER}
{location_filter}
{asset_status_filter}
{equipment_filter}
{workcenter_filter}
{family_filter}
GROUP BY r.WORKCENTERNAME, r.RESOURCEFAMILYNAME, r.RESOURCENAME
ORDER BY r.WORKCENTERNAME, r.RESOURCEFAMILYNAME, r.RESOURCENAME
"""
df = read_sql_df(sql)
# Get workcenter mapping to convert WORKCENTERNAME to WORKCENTER_GROUP
from mes_dashboard.services.filter_cache import get_workcenter_mapping
wc_mapping = get_workcenter_mapping() or {}
# Write CSV header
output = io.StringIO()
writer = csv.writer(output)
headers = [
'站點', '型號', '機台', 'OU%',
'PRD(h)', 'PRD(%)', 'SBY(h)', 'SBY(%)',
'UDT(h)', 'UDT(%)', 'SDT(h)', 'SDT(%)',
'EGT(h)', 'EGT(%)', 'NST(h)', 'NST(%)'
]
writer.writerow(headers)
yield output.getvalue()
output.truncate(0)
output.seek(0)
# Write data rows
for _, row in df.iterrows():
prd = float(row['PRD_HOURS'] or 0)
sby = float(row['SBY_HOURS'] or 0)
udt = float(row['UDT_HOURS'] or 0)
sdt = float(row['SDT_HOURS'] or 0)
egt = float(row['EGT_HOURS'] or 0)
nst = float(row['NST_HOURS'] or 0)
total = float(row['TOTAL_HOURS'] or 0)
# Map WORKCENTERNAME to WORKCENTER_GROUP
wc_name = row['WORKCENTERNAME']
wc_info = wc_mapping.get(wc_name, {})
wc_group = wc_info.get('group', wc_name) # Fallback to workcentername if no mapping
# Calculate percentages
ou_pct = _calc_ou_pct(prd, sby, udt, sdt, egt)
prd_pct = round(prd / total * 100, 1) if total > 0 else 0
sby_pct = round(sby / total * 100, 1) if total > 0 else 0
udt_pct = round(udt / total * 100, 1) if total > 0 else 0
sdt_pct = round(sdt / total * 100, 1) if total > 0 else 0
egt_pct = round(egt / total * 100, 1) if total > 0 else 0
nst_pct = round(nst / total * 100, 1) if total > 0 else 0
csv_row = [
wc_group,
row['RESOURCEFAMILYNAME'],
row['RESOURCENAME'],
f"{ou_pct}%",
round(prd, 1), f"{prd_pct}%",
round(sby, 1), f"{sby_pct}%",
round(udt, 1), f"{udt_pct}%",
round(sdt, 1), f"{sdt_pct}%",
round(egt, 1), f"{egt_pct}%",
round(nst, 1), f"{nst_pct}%"
]
writer.writerow(csv_row)
yield output.getvalue()
output.truncate(0)
output.seek(0)
except Exception as exc:
logger.error(f"CSV export failed: {exc}")
yield f"Error: {exc}\n"
# ============================================================
# Helper Functions
# ============================================================
def _validate_date_range(start_date: str, end_date: str) -> Optional[str]:
"""Validate date range doesn't exceed MAX_QUERY_DAYS."""
try:
start = datetime.strptime(start_date, '%Y-%m-%d')
end = datetime.strptime(end_date, '%Y-%m-%d')
diff = (end - start).days
if diff > MAX_QUERY_DAYS:
return f'查詢範圍不可超過 {MAX_QUERY_DAYS} 天(兩年)'
if diff < 0:
return '結束日期必須大於起始日期'
return None
except ValueError as e:
return f'日期格式錯誤: {e}'
def _get_date_trunc(granularity: str) -> str:
"""Get Oracle TRUNC expression for date granularity."""
trunc_map = {
'day': "TRUNC(ss.TXNDATE)",
'week': "TRUNC(ss.TXNDATE, 'IW')",
'month': "TRUNC(ss.TXNDATE, 'MM')",
'year': "TRUNC(ss.TXNDATE, 'YYYY')"
}
return trunc_map.get(granularity, "TRUNC(ss.TXNDATE)")
def _build_location_filter(alias: str) -> str:
"""Build SQL filter for excluded locations."""
if not EXCLUDED_LOCATIONS:
return ""
excluded = "', '".join(EXCLUDED_LOCATIONS)
return f"AND ({alias}.LOCATIONNAME IS NULL OR {alias}.LOCATIONNAME NOT IN ('{excluded}'))"
def _build_asset_status_filter(alias: str) -> str:
"""Build SQL filter for excluded asset statuses."""
if not EXCLUDED_ASSET_STATUSES:
return ""
excluded = "', '".join(EXCLUDED_ASSET_STATUSES)
return f"AND ({alias}.PJ_ASSETSSTATUS IS NULL OR {alias}.PJ_ASSETSSTATUS NOT IN ('{excluded}'))"
def _build_equipment_flags_filter(
is_production: bool,
is_key: bool,
is_monitor: bool,
alias: str
) -> str:
"""Build SQL filter for equipment flags."""
conditions = []
if is_production:
conditions.append(f"NVL({alias}.PJ_ISPRODUCTION, 0) = 1")
if is_key:
conditions.append(f"NVL({alias}.PJ_ISKEY, 0) = 1")
if is_monitor:
conditions.append(f"NVL({alias}.PJ_ISMONITOR, 0) = 1")
return "AND " + " AND ".join(conditions) if conditions else ""
def _build_workcenter_groups_filter(groups: Optional[List[str]], alias: str) -> str:
"""Build SQL filter for workcenter groups.
Uses filter_cache to get workcentername list for selected groups.
Args:
groups: List of WORKCENTER_GROUP names, or None for no filter
alias: Table alias for WORKCENTERNAME column
Returns:
SQL filter clause (empty string if no filter)
"""
if not groups:
return ""
from mes_dashboard.services.filter_cache import get_workcenters_for_groups
workcenters = get_workcenters_for_groups(groups)
if not workcenters:
return ""
# Escape single quotes and build IN clause
escaped = [wc.replace("'", "''") for wc in workcenters]
in_list = "', '".join(escaped)
return f"AND {alias}.WORKCENTERNAME IN ('{in_list}')"
def _build_families_filter(families: Optional[List[str]], alias: str) -> str:
"""Build SQL filter for resource families.
Args:
families: List of RESOURCEFAMILYNAME values, or None for no filter
alias: Table alias for RESOURCEFAMILYNAME column
Returns:
SQL filter clause (empty string if no filter)
"""
if not families:
return ""
# Escape single quotes and build IN clause
escaped = [f.replace("'", "''") for f in families]
in_list = "', '".join(escaped)
return f"AND {alias}.RESOURCEFAMILYNAME IN ('{in_list}')"
def _safe_float(value, default=0.0) -> float:
"""Safely convert value to float, handling NaN and None."""
if value is None or pd.isna(value):
return default
return float(value)
def _calc_ou_pct(prd: float, sby: float, udt: float, sdt: float, egt: float) -> float:
"""Calculate OU% = PRD / (PRD + SBY + UDT + SDT + EGT) * 100."""
denominator = prd + sby + udt + sdt + egt
return round(prd / denominator * 100, 1) if denominator > 0 else 0
def _build_kpi_from_df(df: pd.DataFrame) -> Dict[str, Any]:
"""Build KPI dict from query result DataFrame."""
if df is None or len(df) == 0:
return {
'ou_pct': 0,
'prd_hours': 0,
'sby_hours': 0,
'udt_hours': 0,
'sdt_hours': 0,
'egt_hours': 0,
'nst_hours': 0,
'machine_count': 0
}
row = df.iloc[0]
prd = _safe_float(row['PRD_HOURS'])
sby = _safe_float(row['SBY_HOURS'])
udt = _safe_float(row['UDT_HOURS'])
sdt = _safe_float(row['SDT_HOURS'])
egt = _safe_float(row['EGT_HOURS'])
nst = _safe_float(row['NST_HOURS'])
machine_count = int(_safe_float(row['MACHINE_COUNT']))
return {
'ou_pct': _calc_ou_pct(prd, sby, udt, sdt, egt),
'prd_hours': round(prd, 1),
'sby_hours': round(sby, 1),
'udt_hours': round(udt, 1),
'sdt_hours': round(sdt, 1),
'egt_hours': round(egt, 1),
'nst_hours': round(nst, 1),
'machine_count': machine_count
}
def _format_date(date_val, granularity: str) -> Optional[str]:
"""Format date value based on granularity."""
if pd.isna(date_val):
return None
if granularity == 'year':
return date_val.strftime('%Y')
elif granularity == 'month':
return date_val.strftime('%Y-%m')
elif granularity == 'week':
return date_val.strftime('%Y-%m-%d') # Week start date
else:
return date_val.strftime('%Y-%m-%d')
def _build_trend_from_df(df: pd.DataFrame, granularity: str) -> List[Dict]:
"""Build trend data from query result DataFrame."""
if df is None or len(df) == 0:
return []
result = []
for _, row in df.iterrows():
prd = _safe_float(row['PRD_HOURS'])
sby = _safe_float(row['SBY_HOURS'])
udt = _safe_float(row['UDT_HOURS'])
sdt = _safe_float(row['SDT_HOURS'])
egt = _safe_float(row['EGT_HOURS'])
nst = _safe_float(row['NST_HOURS'])
result.append({
'date': _format_date(row['DATA_DATE'], granularity),
'ou_pct': _calc_ou_pct(prd, sby, udt, sdt, egt),
'prd_hours': round(prd, 1),
'sby_hours': round(sby, 1),
'udt_hours': round(udt, 1),
'sdt_hours': round(sdt, 1),
'egt_hours': round(egt, 1),
'nst_hours': round(nst, 1)
})
return result
def _build_heatmap_from_df(df: pd.DataFrame, granularity: str) -> List[Dict]:
"""Build heatmap data from query result DataFrame."""
if df is None or len(df) == 0:
return []
# Get workcenter mapping to convert WORKCENTERNAME to WORKCENTER_GROUP
from mes_dashboard.services.filter_cache import get_workcenter_mapping
wc_mapping = get_workcenter_mapping() or {}
# Aggregate data by WORKCENTER_GROUP and date
aggregated = {}
for _, row in df.iterrows():
wc_name = row['WORKCENTERNAME']
# Skip rows with NaN workcenter name
if pd.isna(wc_name):
continue
wc_info = wc_mapping.get(wc_name, {})
wc_group = wc_info.get('group', wc_name)
date_str = _format_date(row['DATA_DATE'], granularity)
key = (wc_group, date_str)
if key not in aggregated:
aggregated[key] = {'prd': 0, 'sby': 0, 'udt': 0, 'sdt': 0, 'egt': 0}
aggregated[key]['prd'] += _safe_float(row['PRD_HOURS'])
aggregated[key]['sby'] += _safe_float(row['SBY_HOURS'])
aggregated[key]['udt'] += _safe_float(row['UDT_HOURS'])
aggregated[key]['sdt'] += _safe_float(row['SDT_HOURS'])
aggregated[key]['egt'] += _safe_float(row['EGT_HOURS'])
result = []
for (wc_group, date_str), data in aggregated.items():
result.append({
'workcenter': wc_group,
'date': date_str,
'ou_pct': _calc_ou_pct(data['prd'], data['sby'], data['udt'], data['sdt'], data['egt'])
})
# Sort by workcenter and date
result.sort(key=lambda x: (x['workcenter'], x['date'] or ''))
return result
def _build_comparison_from_df(df: pd.DataFrame) -> List[Dict]:
"""Build workcenter comparison data from query result DataFrame."""
if df is None or len(df) == 0:
return []
# Get workcenter mapping to convert WORKCENTERNAME to WORKCENTER_GROUP
from mes_dashboard.services.filter_cache import get_workcenter_mapping
wc_mapping = get_workcenter_mapping() or {}
# Aggregate data by WORKCENTER_GROUP
aggregated = {}
for _, row in df.iterrows():
wc_name = row['WORKCENTERNAME']
# Skip rows with NaN workcenter name
if pd.isna(wc_name):
continue
wc_info = wc_mapping.get(wc_name, {})
wc_group = wc_info.get('group', wc_name)
if wc_group not in aggregated:
aggregated[wc_group] = {'prd': 0, 'sby': 0, 'udt': 0, 'sdt': 0, 'egt': 0, 'machine_count': 0}
aggregated[wc_group]['prd'] += _safe_float(row['PRD_HOURS'])
aggregated[wc_group]['sby'] += _safe_float(row['SBY_HOURS'])
aggregated[wc_group]['udt'] += _safe_float(row['UDT_HOURS'])
aggregated[wc_group]['sdt'] += _safe_float(row['SDT_HOURS'])
aggregated[wc_group]['egt'] += _safe_float(row['EGT_HOURS'])
aggregated[wc_group]['machine_count'] += int(_safe_float(row['MACHINE_COUNT']))
result = []
for wc_group, data in aggregated.items():
result.append({
'workcenter': wc_group,
'ou_pct': _calc_ou_pct(data['prd'], data['sby'], data['udt'], data['sdt'], data['egt']),
'prd_hours': round(data['prd'], 1),
'machine_count': data['machine_count']
})
# Sort by OU% descending
result.sort(key=lambda x: x['ou_pct'], reverse=True)
return result
def _build_detail_from_df(df: pd.DataFrame) -> List[Dict]:
"""Build detail data from query result DataFrame."""
if df is None or len(df) == 0:
return []
# Get workcenter mapping to convert WORKCENTERNAME to WORKCENTER_GROUP
from mes_dashboard.services.filter_cache import get_workcenter_mapping
wc_mapping = get_workcenter_mapping() or {}
result = []
for _, row in df.iterrows():
# Skip rows with NaN workcenter name
wc_name = row['WORKCENTERNAME']
if pd.isna(wc_name):
continue
prd = _safe_float(row['PRD_HOURS'])
sby = _safe_float(row['SBY_HOURS'])
udt = _safe_float(row['UDT_HOURS'])
sdt = _safe_float(row['SDT_HOURS'])
egt = _safe_float(row['EGT_HOURS'])
nst = _safe_float(row['NST_HOURS'])
total = _safe_float(row['TOTAL_HOURS'])
# Map WORKCENTERNAME to WORKCENTER_GROUP
wc_info = wc_mapping.get(wc_name, {})
wc_group = wc_info.get('group', wc_name) # Fallback to workcentername if no mapping
# Handle NaN in string fields
family = row['RESOURCEFAMILYNAME']
resource = row['RESOURCENAME']
result.append({
'workcenter': wc_group,
'family': family if not pd.isna(family) else '',
'resource': resource if not pd.isna(resource) else '',
'ou_pct': _calc_ou_pct(prd, sby, udt, sdt, egt),
'prd_hours': round(prd, 1),
'prd_pct': round(prd / total * 100, 1) if total > 0 else 0,
'sby_hours': round(sby, 1),
'sby_pct': round(sby / total * 100, 1) if total > 0 else 0,
'udt_hours': round(udt, 1),
'udt_pct': round(udt / total * 100, 1) if total > 0 else 0,
'sdt_hours': round(sdt, 1),
'sdt_pct': round(sdt / total * 100, 1) if total > 0 else 0,
'egt_hours': round(egt, 1),
'egt_pct': round(egt / total * 100, 1) if total > 0 else 0,
'nst_hours': round(nst, 1),
'nst_pct': round(nst / total * 100, 1) if total > 0 else 0,
'machine_count': 1
})
return result

View File

@@ -173,8 +173,18 @@ const MesApi = (function() {
Toast.dismiss(loadingToastId);
}
const data = await response.json();
return data;
try {
const data = await response.json();
return data;
} catch (parseError) {
// JSON parse error on successful response - don't retry
console.error(`[MesApi] ${reqId} ✗ JSON parse failed:`, parseError.message);
if (!silent) {
Toast.error('回應資料解析失敗,資料量可能過大');
}
parseError.isParseError = true;
throw parseError;
}
}
// Non-OK response
@@ -205,6 +215,14 @@ const MesApi = (function() {
throw error;
}
// JSON parse error on successful response - don't retry
if (error.isParseError) {
if (loadingToastId) {
Toast.dismiss(loadingToastId);
}
throw error;
}
lastError = error;
}

View File

@@ -154,6 +154,9 @@
{% if can_view_page('/excel-query') %}
<button class="tab" data-target="excelQueryFrame">Excel 批次查詢</button>
{% endif %}
{% if can_view_page('/resource-history') %}
<button class="tab" data-target="resourceHistoryFrame">機台歷史分析</button>
{% endif %}
</div>
<div class="panel">
@@ -170,6 +173,9 @@
{% if can_view_page('/excel-query') %}
<iframe id="excelQueryFrame" data-src="/excel-query" title="Excel 批次查詢"></iframe>
{% endif %}
{% if can_view_page('/resource-history') %}
<iframe id="resourceHistoryFrame" data-src="/resource-history" title="機台歷史分析"></iframe>
{% endif %}
</div>
</div>
{% endblock %}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,312 @@
# -*- coding: utf-8 -*-
"""End-to-end tests for resource history analysis page.
These tests simulate real user workflows through the resource history analysis feature.
Run with: pytest tests/e2e/test_resource_history_e2e.py -v --run-integration
"""
import json
import pytest
from unittest.mock import patch, MagicMock
import pandas as pd
from datetime import datetime, timedelta
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', 'src'))
import mes_dashboard.core.database as db
from mes_dashboard.app import create_app
@pytest.fixture
def app():
"""Create application for testing."""
db._ENGINE = None
app = create_app('testing')
app.config['TESTING'] = True
return app
@pytest.fixture
def client(app):
"""Create test client."""
return app.test_client()
class TestResourceHistoryPageAccess:
"""E2E tests for page access and navigation."""
def test_page_loads_successfully(self, client):
"""Resource history page should load without errors."""
response = client.get('/resource-history')
assert response.status_code == 200
content = response.data.decode('utf-8')
assert '機台歷史表現分析' in content
def test_page_contains_filter_elements(self, client):
"""Page should contain all filter elements."""
response = client.get('/resource-history')
content = response.data.decode('utf-8')
# Check for filter elements
assert 'startDate' in content
assert 'endDate' in content
# Multi-select dropdowns
assert 'workcenterGroupsDropdown' in content
assert 'familiesDropdown' in content
assert 'isProduction' in content
assert 'isKey' in content
assert 'isMonitor' in content
def test_page_contains_kpi_cards(self, client):
"""Page should contain KPI card elements."""
response = client.get('/resource-history')
content = response.data.decode('utf-8')
assert 'kpiOuPct' in content
assert 'kpiPrdHours' in content
assert 'kpiUdtHours' in content
assert 'kpiSdtHours' in content
assert 'kpiEgtHours' in content
assert 'kpiMachineCount' in content
def test_page_contains_chart_containers(self, client):
"""Page should contain chart container elements."""
response = client.get('/resource-history')
content = response.data.decode('utf-8')
assert 'trendChart' in content
assert 'stackedChart' in content
assert 'comparisonChart' in content
assert 'heatmapChart' in content
def test_page_contains_table_elements(self, client):
"""Page should contain table elements."""
response = client.get('/resource-history')
content = response.data.decode('utf-8')
assert 'detailTableBody' in content
assert 'expandAllBtn' in content
assert 'collapseAllBtn' in content
assert 'exportBtn' in content
class TestResourceHistoryAPIWorkflow:
"""E2E tests for API workflows."""
@patch('mes_dashboard.services.filter_cache.get_workcenter_groups')
@patch('mes_dashboard.services.filter_cache.get_resource_families')
def test_filter_options_workflow(self, mock_families, mock_groups, client):
"""Filter options should be loadable."""
mock_groups.return_value = [
{'name': '焊接_DB', 'sequence': 1},
{'name': '焊接_WB', 'sequence': 2},
{'name': '成型', 'sequence': 4},
]
mock_families.return_value = ['FAM001', 'FAM002']
response = client.get('/api/resource/history/options')
assert response.status_code == 200
data = json.loads(response.data)
assert data['success'] is True
assert 'workcenter_groups' in data['data']
assert 'families' in data['data']
@patch('mes_dashboard.services.resource_history_service.read_sql_df')
def test_complete_query_workflow(self, mock_read_sql, client):
"""Complete query workflow should return all data sections."""
# Mock responses for the 4 queries in query_summary
kpi_df = pd.DataFrame([{
'PRD_HOURS': 8000, 'SBY_HOURS': 1000, 'UDT_HOURS': 500,
'SDT_HOURS': 300, 'EGT_HOURS': 200, 'NST_HOURS': 1000,
'MACHINE_COUNT': 100
}])
trend_df = pd.DataFrame([
{'DATA_DATE': datetime(2024, 1, 1), 'PRD_HOURS': 1000, 'SBY_HOURS': 100,
'UDT_HOURS': 50, 'SDT_HOURS': 30, 'EGT_HOURS': 20, 'NST_HOURS': 100, 'MACHINE_COUNT': 100},
{'DATA_DATE': datetime(2024, 1, 2), 'PRD_HOURS': 1100, 'SBY_HOURS': 90,
'UDT_HOURS': 40, 'SDT_HOURS': 25, 'EGT_HOURS': 15, 'NST_HOURS': 100, 'MACHINE_COUNT': 100},
])
heatmap_df = pd.DataFrame([
{'WORKCENTERNAME': '焊接_DB', 'DATA_DATE': datetime(2024, 1, 1),
'PRD_HOURS': 400, 'SBY_HOURS': 50, 'UDT_HOURS': 25, 'SDT_HOURS': 15, 'EGT_HOURS': 10},
{'WORKCENTERNAME': '成型', 'DATA_DATE': datetime(2024, 1, 1),
'PRD_HOURS': 600, 'SBY_HOURS': 50, 'UDT_HOURS': 25, 'SDT_HOURS': 15, 'EGT_HOURS': 10},
])
comparison_df = pd.DataFrame([
{'WORKCENTERNAME': '焊接_DB', 'PRD_HOURS': 4000, 'SBY_HOURS': 500,
'UDT_HOURS': 250, 'SDT_HOURS': 150, 'EGT_HOURS': 100, 'MACHINE_COUNT': 50},
{'WORKCENTERNAME': '成型', 'PRD_HOURS': 4000, 'SBY_HOURS': 500,
'UDT_HOURS': 250, 'SDT_HOURS': 150, 'EGT_HOURS': 100, 'MACHINE_COUNT': 50},
])
# Use function-based side_effect for ThreadPoolExecutor parallel queries
def mock_sql(sql):
sql_upper = sql.upper()
if 'DATA_DATE' in sql_upper and 'WORKCENTERNAME' in sql_upper:
return heatmap_df
elif 'DATA_DATE' in sql_upper:
return trend_df
elif 'WORKCENTERNAME' in sql_upper:
return comparison_df
else:
return kpi_df
mock_read_sql.side_effect = mock_sql
response = client.get(
'/api/resource/history/summary'
'?start_date=2024-01-01'
'&end_date=2024-01-07'
'&granularity=day'
)
assert response.status_code == 200
data = json.loads(response.data)
assert data['success'] is True
# Verify KPI
assert data['data']['kpi']['ou_pct'] == 80.0
assert data['data']['kpi']['machine_count'] == 100
# Verify trend
assert len(data['data']['trend']) == 2
# Verify heatmap
assert len(data['data']['heatmap']) == 2
# Verify comparison
assert len(data['data']['workcenter_comparison']) == 2
@patch('mes_dashboard.services.resource_history_service.read_sql_df')
def test_detail_query_workflow(self, mock_read_sql, client):
"""Detail query workflow should return hierarchical data."""
detail_df = pd.DataFrame([
{'WORKCENTERNAME': '焊接_DB', 'RESOURCEFAMILYNAME': 'FAM001', 'RESOURCENAME': 'RES001',
'PRD_HOURS': 80, 'SBY_HOURS': 10, 'UDT_HOURS': 5, 'SDT_HOURS': 3, 'EGT_HOURS': 2,
'NST_HOURS': 10, 'TOTAL_HOURS': 110},
{'WORKCENTERNAME': '焊接_DB', 'RESOURCEFAMILYNAME': 'FAM001', 'RESOURCENAME': 'RES002',
'PRD_HOURS': 75, 'SBY_HOURS': 15, 'UDT_HOURS': 5, 'SDT_HOURS': 3, 'EGT_HOURS': 2,
'NST_HOURS': 10, 'TOTAL_HOURS': 110},
])
mock_read_sql.return_value = detail_df
response = client.get(
'/api/resource/history/detail'
'?start_date=2024-01-01'
'&end_date=2024-01-07'
)
assert response.status_code == 200
data = json.loads(response.data)
assert data['success'] is True
assert data['total'] == 2
assert len(data['data']) == 2
assert data['truncated'] is False
# Verify data structure
first_row = data['data'][0]
assert 'workcenter' in first_row
assert 'family' in first_row
assert 'resource' in first_row
assert 'ou_pct' in first_row
assert 'prd_hours' in first_row
assert 'prd_pct' in first_row
@patch('mes_dashboard.services.resource_history_service.read_sql_df')
def test_export_workflow(self, mock_read_sql, client):
"""Export workflow should return valid CSV."""
mock_read_sql.return_value = pd.DataFrame([
{'WORKCENTERNAME': '焊接_DB', 'RESOURCEFAMILYNAME': 'FAM001', 'RESOURCENAME': 'RES001',
'PRD_HOURS': 80, 'SBY_HOURS': 10, 'UDT_HOURS': 5, 'SDT_HOURS': 3, 'EGT_HOURS': 2,
'NST_HOURS': 10, 'TOTAL_HOURS': 110},
])
response = client.get(
'/api/resource/history/export'
'?start_date=2024-01-01'
'&end_date=2024-01-07'
)
assert response.status_code == 200
assert 'text/csv' in response.content_type
content = response.data.decode('utf-8-sig')
lines = content.strip().split('\n')
# Should have header + data rows
assert len(lines) >= 2
# Verify header
header = lines[0]
assert '站點' in header
assert 'OU%' in header
class TestResourceHistoryValidation:
"""E2E tests for input validation."""
def test_date_range_validation(self, client):
"""Date range exceeding 730 days should be rejected."""
response = client.get(
'/api/resource/history/summary'
'?start_date=2024-01-01'
'&end_date=2026-01-02'
)
assert response.status_code == 400
data = json.loads(response.data)
assert data['success'] is False
assert '730' in data['error']
def test_missing_required_params(self, client):
"""Missing required parameters should return error."""
response = client.get('/api/resource/history/summary')
assert response.status_code == 400
data = json.loads(response.data)
assert data['success'] is False
@patch('mes_dashboard.services.resource_history_service.read_sql_df')
def test_granularity_options(self, mock_read_sql, client):
"""Different granularity options should work."""
mock_df = pd.DataFrame([{
'PRD_HOURS': 100, 'SBY_HOURS': 10, 'UDT_HOURS': 5,
'SDT_HOURS': 3, 'EGT_HOURS': 2, 'NST_HOURS': 10, 'MACHINE_COUNT': 5
}])
mock_read_sql.return_value = mock_df
for granularity in ['day', 'week', 'month', 'year']:
mock_read_sql.side_effect = [mock_df, pd.DataFrame(), pd.DataFrame(), pd.DataFrame()]
response = client.get(
f'/api/resource/history/summary'
f'?start_date=2024-01-01'
f'&end_date=2024-01-31'
f'&granularity={granularity}'
)
assert response.status_code == 200, f"Failed for granularity={granularity}"
class TestResourceHistoryNavigation:
"""E2E tests for navigation integration."""
def test_portal_includes_history_tab(self, client):
"""Portal should include resource history tab."""
response = client.get('/')
content = response.data.decode('utf-8')
assert '機台歷史分析' in content
assert 'resourceHistoryFrame' in content
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@@ -0,0 +1,297 @@
# -*- coding: utf-8 -*-
"""Integration tests for resource history API endpoints.
Tests API endpoints for proper response format, error handling,
and parameter validation.
"""
import unittest
from unittest.mock import patch, MagicMock
import json
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src'))
import mes_dashboard.core.database as db
from mes_dashboard.app import create_app
class TestResourceHistoryOptionsAPI(unittest.TestCase):
"""Integration tests for /api/resource/history/options endpoint."""
def setUp(self):
"""Set up test client."""
db._ENGINE = None
self.app = create_app('testing')
self.app.config['TESTING'] = True
self.client = self.app.test_client()
@patch('mes_dashboard.routes.resource_history_routes.get_filter_options')
def test_options_success(self, mock_get_options):
"""Successful options request should return workcenter_groups and families."""
mock_get_options.return_value = {
'workcenter_groups': [
{'name': '焊接_DB', 'sequence': 1},
{'name': '成型', 'sequence': 4}
],
'families': ['FAM01', 'FAM02']
}
response = self.client.get('/api/resource/history/options')
self.assertEqual(response.status_code, 200)
data = json.loads(response.data)
self.assertTrue(data['success'])
self.assertIn('data', data)
self.assertEqual(len(data['data']['workcenter_groups']), 2)
self.assertEqual(data['data']['workcenter_groups'][0]['name'], '焊接_DB')
self.assertEqual(data['data']['families'], ['FAM01', 'FAM02'])
@patch('mes_dashboard.routes.resource_history_routes.get_filter_options')
def test_options_failure(self, mock_get_options):
"""Failed options request should return error."""
mock_get_options.return_value = None
response = self.client.get('/api/resource/history/options')
self.assertEqual(response.status_code, 500)
data = json.loads(response.data)
self.assertFalse(data['success'])
self.assertIn('error', data)
class TestResourceHistorySummaryAPI(unittest.TestCase):
"""Integration tests for /api/resource/history/summary endpoint."""
def setUp(self):
"""Set up test client."""
db._ENGINE = None
self.app = create_app('testing')
self.app.config['TESTING'] = True
self.client = self.app.test_client()
def test_missing_start_date(self):
"""Missing start_date should return 400."""
response = self.client.get('/api/resource/history/summary?end_date=2024-01-31')
self.assertEqual(response.status_code, 400)
data = json.loads(response.data)
self.assertFalse(data['success'])
self.assertIn('start_date', data['error'])
def test_missing_end_date(self):
"""Missing end_date should return 400."""
response = self.client.get('/api/resource/history/summary?start_date=2024-01-01')
self.assertEqual(response.status_code, 400)
data = json.loads(response.data)
self.assertFalse(data['success'])
self.assertIn('end_date', data['error'])
@patch('mes_dashboard.routes.resource_history_routes.query_summary')
def test_date_range_exceeds_limit(self, mock_query):
"""Date range exceeding 730 days should return error."""
mock_query.return_value = {'error': '查詢範圍不可超過 730 天(兩年)'}
response = self.client.get(
'/api/resource/history/summary?start_date=2024-01-01&end_date=2026-01-02'
)
self.assertEqual(response.status_code, 400)
data = json.loads(response.data)
self.assertFalse(data['success'])
self.assertIn('730', data['error'])
@patch('mes_dashboard.routes.resource_history_routes.query_summary')
def test_successful_summary(self, mock_query):
"""Successful summary request should return all data sections."""
mock_query.return_value = {
'kpi': {
'ou_pct': 80.0,
'prd_hours': 800,
'sby_hours': 100,
'udt_hours': 50,
'sdt_hours': 30,
'egt_hours': 20,
'nst_hours': 100,
'machine_count': 10
},
'trend': [{'date': '2024-01-01', 'ou_pct': 80.0}],
'heatmap': [{'workcenter': 'WC01', 'date': '2024-01-01', 'ou_pct': 80.0}],
'workcenter_comparison': [{'workcenter': 'WC01', 'ou_pct': 80.0}]
}
response = self.client.get(
'/api/resource/history/summary?start_date=2024-01-01&end_date=2024-01-07'
)
self.assertEqual(response.status_code, 200)
data = json.loads(response.data)
self.assertTrue(data['success'])
self.assertIn('kpi', data['data'])
self.assertIn('trend', data['data'])
self.assertIn('heatmap', data['data'])
self.assertIn('workcenter_comparison', data['data'])
@patch('mes_dashboard.routes.resource_history_routes.query_summary')
def test_summary_with_filters(self, mock_query):
"""Summary with filters should pass them to service."""
mock_query.return_value = {'kpi': {}, 'trend': [], 'heatmap': [], 'workcenter_comparison': []}
response = self.client.get(
'/api/resource/history/summary'
'?start_date=2024-01-01'
'&end_date=2024-01-07'
'&granularity=week'
'&workcenter_groups=焊接_DB'
'&workcenter_groups=成型'
'&families=FAM01'
'&families=FAM02'
'&is_production=1'
'&is_key=1'
)
self.assertEqual(response.status_code, 200)
mock_query.assert_called_once()
call_kwargs = mock_query.call_args[1]
self.assertEqual(call_kwargs['granularity'], 'week')
self.assertEqual(call_kwargs['workcenter_groups'], ['焊接_DB', '成型'])
self.assertEqual(call_kwargs['families'], ['FAM01', 'FAM02'])
self.assertTrue(call_kwargs['is_production'])
self.assertTrue(call_kwargs['is_key'])
class TestResourceHistoryDetailAPI(unittest.TestCase):
"""Integration tests for /api/resource/history/detail endpoint."""
def setUp(self):
"""Set up test client."""
db._ENGINE = None
self.app = create_app('testing')
self.app.config['TESTING'] = True
self.client = self.app.test_client()
def test_missing_dates(self):
"""Missing dates should return 400."""
response = self.client.get('/api/resource/history/detail')
self.assertEqual(response.status_code, 400)
data = json.loads(response.data)
self.assertFalse(data['success'])
@patch('mes_dashboard.routes.resource_history_routes.query_detail')
def test_successful_detail(self, mock_query):
"""Successful detail request should return data with total and truncated flag."""
mock_query.return_value = {
'data': [
{'workcenter': 'WC01', 'family': 'FAM01', 'resource': 'RES01', 'ou_pct': 80.0}
],
'total': 100,
'truncated': False,
'max_records': None
}
response = self.client.get(
'/api/resource/history/detail?start_date=2024-01-01&end_date=2024-01-07'
)
self.assertEqual(response.status_code, 200)
data = json.loads(response.data)
self.assertTrue(data['success'])
self.assertIn('data', data)
self.assertIn('total', data)
self.assertIn('truncated', data)
self.assertFalse(data['truncated'])
@patch('mes_dashboard.routes.resource_history_routes.query_detail')
def test_detail_truncated_warning(self, mock_query):
"""Detail with truncated data should return truncated flag and max_records."""
mock_query.return_value = {
'data': [{'workcenter': 'WC01', 'family': 'FAM01', 'resource': 'RES01', 'ou_pct': 80.0}],
'total': 6000,
'truncated': True,
'max_records': 5000
}
response = self.client.get(
'/api/resource/history/detail'
'?start_date=2024-01-01'
'&end_date=2024-01-07'
)
self.assertEqual(response.status_code, 200)
data = json.loads(response.data)
self.assertTrue(data['success'])
self.assertTrue(data['truncated'])
self.assertEqual(data['max_records'], 5000)
self.assertEqual(data['total'], 6000)
class TestResourceHistoryExportAPI(unittest.TestCase):
"""Integration tests for /api/resource/history/export endpoint."""
def setUp(self):
"""Set up test client."""
db._ENGINE = None
self.app = create_app('testing')
self.app.config['TESTING'] = True
self.client = self.app.test_client()
def test_missing_dates(self):
"""Missing dates should return 400."""
response = self.client.get('/api/resource/history/export')
self.assertEqual(response.status_code, 400)
data = json.loads(response.data)
self.assertFalse(data['success'])
@patch('mes_dashboard.routes.resource_history_routes.export_csv')
def test_successful_export(self, mock_export):
"""Successful export should return CSV with correct headers."""
mock_export.return_value = iter(['站點,型號,機台,OU%\n', 'WC01,FAM01,RES01,80%\n'])
response = self.client.get(
'/api/resource/history/export?start_date=2024-01-01&end_date=2024-01-07'
)
self.assertEqual(response.status_code, 200)
self.assertIn('text/csv', response.content_type)
self.assertIn('attachment', response.headers['Content-Disposition'])
self.assertIn('resource_history', response.headers['Content-Disposition'])
@patch('mes_dashboard.routes.resource_history_routes.export_csv')
def test_export_filename_includes_dates(self, mock_export):
"""Export filename should include date range."""
mock_export.return_value = iter(['header\n'])
response = self.client.get(
'/api/resource/history/export?start_date=2024-01-01&end_date=2024-01-07'
)
self.assertIn('2024-01-01', response.headers['Content-Disposition'])
self.assertIn('2024-01-07', response.headers['Content-Disposition'])
class TestAPIContentType(unittest.TestCase):
"""Test that APIs return proper content types."""
def setUp(self):
"""Set up test client."""
db._ENGINE = None
self.app = create_app('testing')
self.app.config['TESTING'] = True
self.client = self.app.test_client()
@patch('mes_dashboard.routes.resource_history_routes.get_filter_options')
def test_json_content_type(self, mock_get_options):
"""API endpoints should return application/json content type."""
mock_get_options.return_value = {'workcenter_groups': [], 'families': []}
response = self.client.get('/api/resource/history/options')
self.assertIn('application/json', response.content_type)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,374 @@
# -*- coding: utf-8 -*-
"""Unit tests for resource_history_service.py.
Tests the service layer functions for resource history analysis.
"""
import unittest
from unittest.mock import patch, MagicMock
from datetime import datetime, timedelta
import pandas as pd
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src'))
from mes_dashboard.services.resource_history_service import (
get_filter_options,
query_summary,
query_detail,
export_csv,
_validate_date_range,
_get_date_trunc,
_calc_ou_pct,
_build_kpi_from_df,
_build_detail_from_df,
MAX_QUERY_DAYS,
)
class TestValidateDateRange(unittest.TestCase):
"""Test date range validation."""
def test_valid_date_range(self):
"""Valid date range should return None."""
result = _validate_date_range('2024-01-01', '2024-01-31')
self.assertIsNone(result)
def test_date_range_exceeds_max(self):
"""Date range exceeding MAX_QUERY_DAYS should return error message."""
result = _validate_date_range('2024-01-01', '2026-01-02')
self.assertIsNotNone(result)
self.assertIn('730', result)
def test_end_date_before_start_date(self):
"""End date before start date should return error message."""
result = _validate_date_range('2024-01-31', '2024-01-01')
self.assertIsNotNone(result)
self.assertIn('起始日期', result)
def test_invalid_date_format(self):
"""Invalid date format should return error message."""
result = _validate_date_range('invalid', '2024-01-01')
self.assertIsNotNone(result)
self.assertIn('日期格式錯誤', result)
class TestGetDateTrunc(unittest.TestCase):
"""Test date truncation SQL generation."""
def test_day_granularity(self):
"""Day granularity should use TRUNC without format."""
result = _get_date_trunc('day')
self.assertIn('TRUNC(ss.TXNDATE)', result)
self.assertNotIn('IW', result)
def test_week_granularity(self):
"""Week granularity should use TRUNC with IW format."""
result = _get_date_trunc('week')
self.assertIn("'IW'", result)
def test_month_granularity(self):
"""Month granularity should use TRUNC with MM format."""
result = _get_date_trunc('month')
self.assertIn("'MM'", result)
def test_year_granularity(self):
"""Year granularity should use TRUNC with YYYY format."""
result = _get_date_trunc('year')
self.assertIn("'YYYY'", result)
def test_unknown_granularity(self):
"""Unknown granularity should default to day."""
result = _get_date_trunc('unknown')
self.assertIn('TRUNC(ss.TXNDATE)', result)
self.assertNotIn("'IW'", result)
class TestCalcOuPct(unittest.TestCase):
"""Test OU% calculation."""
def test_normal_calculation(self):
"""OU% should be calculated correctly."""
# PRD=800, SBY=100, UDT=50, SDT=30, EGT=20
# OU% = 800 / (800+100+50+30+20) * 100 = 80%
result = _calc_ou_pct(800, 100, 50, 30, 20)
self.assertEqual(result, 80.0)
def test_zero_denominator(self):
"""Zero denominator should return 0, not error."""
result = _calc_ou_pct(0, 0, 0, 0, 0)
self.assertEqual(result, 0)
def test_all_prd(self):
"""100% PRD should result in 100% OU."""
result = _calc_ou_pct(100, 0, 0, 0, 0)
self.assertEqual(result, 100.0)
def test_no_prd(self):
"""No PRD should result in 0% OU."""
result = _calc_ou_pct(0, 100, 50, 30, 20)
self.assertEqual(result, 0)
class TestBuildKpiFromDf(unittest.TestCase):
"""Test KPI building from DataFrame."""
def test_empty_dataframe(self):
"""Empty DataFrame should return default KPI values."""
df = pd.DataFrame()
result = _build_kpi_from_df(df)
self.assertEqual(result['ou_pct'], 0)
self.assertEqual(result['prd_hours'], 0)
self.assertEqual(result['machine_count'], 0)
def test_normal_dataframe(self):
"""Normal DataFrame should build correct KPI."""
df = pd.DataFrame([{
'PRD_HOURS': 800,
'SBY_HOURS': 100,
'UDT_HOURS': 50,
'SDT_HOURS': 30,
'EGT_HOURS': 20,
'NST_HOURS': 100,
'MACHINE_COUNT': 10
}])
result = _build_kpi_from_df(df)
self.assertEqual(result['ou_pct'], 80.0)
self.assertEqual(result['prd_hours'], 800)
self.assertEqual(result['machine_count'], 10)
def test_none_values_in_dataframe(self):
"""None values should be treated as 0."""
df = pd.DataFrame([{
'PRD_HOURS': None,
'SBY_HOURS': None,
'UDT_HOURS': None,
'SDT_HOURS': None,
'EGT_HOURS': None,
'NST_HOURS': None,
'MACHINE_COUNT': None
}])
result = _build_kpi_from_df(df)
self.assertEqual(result['ou_pct'], 0)
self.assertEqual(result['prd_hours'], 0)
self.assertEqual(result['machine_count'], 0)
class TestBuildDetailFromDf(unittest.TestCase):
"""Test detail data building from DataFrame."""
def test_empty_dataframe(self):
"""Empty DataFrame should return empty list."""
df = pd.DataFrame()
result = _build_detail_from_df(df)
self.assertEqual(result, [])
def test_normal_dataframe(self):
"""Normal DataFrame should build correct detail data."""
df = pd.DataFrame([{
'WORKCENTERNAME': 'WC01',
'RESOURCEFAMILYNAME': 'FAM01',
'RESOURCENAME': 'RES01',
'PRD_HOURS': 80,
'SBY_HOURS': 10,
'UDT_HOURS': 5,
'SDT_HOURS': 3,
'EGT_HOURS': 2,
'NST_HOURS': 10,
'TOTAL_HOURS': 110
}])
result = _build_detail_from_df(df)
self.assertEqual(len(result), 1)
self.assertEqual(result[0]['workcenter'], 'WC01')
self.assertEqual(result[0]['family'], 'FAM01')
self.assertEqual(result[0]['resource'], 'RES01')
self.assertEqual(result[0]['machine_count'], 1)
# OU% = 80 / (80+10+5+3+2) * 100 = 80%
self.assertEqual(result[0]['ou_pct'], 80.0)
class TestGetFilterOptions(unittest.TestCase):
"""Test filter options retrieval."""
@patch('mes_dashboard.services.filter_cache.get_workcenter_groups')
@patch('mes_dashboard.services.filter_cache.get_resource_families')
def test_cache_failure(self, mock_families, mock_groups):
"""Cache failure should return None."""
mock_groups.return_value = None
mock_families.return_value = None
result = get_filter_options()
self.assertIsNone(result)
@patch('mes_dashboard.services.filter_cache.get_workcenter_groups')
@patch('mes_dashboard.services.filter_cache.get_resource_families')
def test_successful_query(self, mock_families, mock_groups):
"""Successful query should return workcenter groups and families."""
mock_groups.return_value = [
{'name': '焊接_DB', 'sequence': 1},
{'name': '成型', 'sequence': 4},
]
mock_families.return_value = ['FAM01', 'FAM02']
result = get_filter_options()
self.assertIsNotNone(result)
self.assertEqual(len(result['workcenter_groups']), 2)
self.assertEqual(result['workcenter_groups'][0]['name'], '焊接_DB')
self.assertEqual(result['families'], ['FAM01', 'FAM02'])
class TestQuerySummary(unittest.TestCase):
"""Test summary query function."""
def test_invalid_date_range(self):
"""Invalid date range should return error."""
result = query_summary(
start_date='2024-01-01',
end_date='2026-01-02', # More than 730 days
granularity='day'
)
self.assertIsNotNone(result)
self.assertIn('error', result)
@patch('mes_dashboard.services.resource_history_service.read_sql_df')
def test_successful_query(self, mock_read_sql):
"""Successful query should return all sections."""
# Mock data for all queries
kpi_df = pd.DataFrame([{
'PRD_HOURS': 800, 'SBY_HOURS': 100, 'UDT_HOURS': 50,
'SDT_HOURS': 30, 'EGT_HOURS': 20, 'NST_HOURS': 100,
'MACHINE_COUNT': 10
}])
trend_df = pd.DataFrame([{
'DATA_DATE': datetime(2024, 1, 1),
'PRD_HOURS': 100, 'SBY_HOURS': 10, 'UDT_HOURS': 5,
'SDT_HOURS': 3, 'EGT_HOURS': 2, 'NST_HOURS': 10,
'MACHINE_COUNT': 5
}])
heatmap_df = pd.DataFrame([{
'WORKCENTERNAME': 'WC01', 'DATA_DATE': datetime(2024, 1, 1),
'PRD_HOURS': 80, 'SBY_HOURS': 10, 'UDT_HOURS': 5,
'SDT_HOURS': 3, 'EGT_HOURS': 2
}])
comparison_df = pd.DataFrame([{
'WORKCENTERNAME': 'WC01',
'PRD_HOURS': 800, 'SBY_HOURS': 100, 'UDT_HOURS': 50,
'SDT_HOURS': 30, 'EGT_HOURS': 20, 'MACHINE_COUNT': 10
}])
# Use a function to return appropriate mock based on SQL content
# (ThreadPoolExecutor runs queries in parallel, so side_effect list is unreliable)
def mock_sql(sql):
sql_upper = sql.upper()
if 'DATA_DATE' in sql_upper and 'WORKCENTERNAME' in sql_upper:
return heatmap_df # heatmap has both DATA_DATE and WORKCENTERNAME
elif 'DATA_DATE' in sql_upper:
return trend_df # trend has DATA_DATE but no WORKCENTERNAME
elif 'WORKCENTERNAME' in sql_upper:
return comparison_df # comparison has WORKCENTERNAME but no DATA_DATE
else:
return kpi_df # kpi has neither
mock_read_sql.side_effect = mock_sql
result = query_summary(
start_date='2024-01-01',
end_date='2024-01-07',
granularity='day'
)
self.assertIsNotNone(result)
self.assertIn('kpi', result)
self.assertIn('trend', result)
self.assertIn('heatmap', result)
self.assertIn('workcenter_comparison', result)
class TestQueryDetail(unittest.TestCase):
"""Test detail query function."""
def test_invalid_date_range(self):
"""Invalid date range should return error."""
result = query_detail(
start_date='2024-01-01',
end_date='2026-01-02', # More than 730 days
granularity='day'
)
self.assertIsNotNone(result)
self.assertIn('error', result)
@patch('mes_dashboard.services.resource_history_service.read_sql_df')
def test_successful_query(self, mock_read_sql):
"""Successful query should return data with total count."""
# Mock detail query
detail_df = pd.DataFrame([{
'WORKCENTERNAME': 'WC01',
'RESOURCEFAMILYNAME': 'FAM01',
'RESOURCENAME': 'RES01',
'PRD_HOURS': 80, 'SBY_HOURS': 10, 'UDT_HOURS': 5,
'SDT_HOURS': 3, 'EGT_HOURS': 2, 'NST_HOURS': 10,
'TOTAL_HOURS': 110
}])
mock_read_sql.return_value = detail_df
result = query_detail(
start_date='2024-01-01',
end_date='2024-01-07',
granularity='day',
)
self.assertIsNotNone(result)
self.assertIn('data', result)
self.assertIn('total', result)
self.assertIn('truncated', result)
self.assertEqual(result['total'], 1)
self.assertFalse(result['truncated'])
class TestExportCsv(unittest.TestCase):
"""Test CSV export function."""
def test_invalid_date_range(self):
"""Invalid date range should yield error."""
result = list(export_csv(
start_date='2024-01-01',
end_date='2026-01-02', # More than 730 days
))
self.assertTrue(any('Error' in r for r in result))
@patch('mes_dashboard.services.resource_history_service.read_sql_df')
def test_successful_export(self, mock_read_sql):
"""Successful export should yield CSV rows."""
mock_read_sql.return_value = pd.DataFrame([{
'WORKCENTERNAME': 'WC01',
'RESOURCEFAMILYNAME': 'FAM01',
'RESOURCENAME': 'RES01',
'PRD_HOURS': 80, 'SBY_HOURS': 10, 'UDT_HOURS': 5,
'SDT_HOURS': 3, 'EGT_HOURS': 2, 'NST_HOURS': 10,
'TOTAL_HOURS': 110
}])
result = list(export_csv(
start_date='2024-01-01',
end_date='2024-01-07',
))
# Should have header row + data row
self.assertGreaterEqual(len(result), 2)
# Header should contain column names
self.assertIn('站點', result[0])
self.assertIn('OU%', result[0])
if __name__ == '__main__':
unittest.main()