feat: 新增批次追蹤工具 (Query Tool)

新增完整的批次追蹤和設備時段查詢功能:

批次追蹤功能:
- 支援 LOT ID / 流水號 / GA工單 三種查詢方式
- 生產歷程查詢 (DW_MES_LOTWIPHISTORY)
- 前後批比對 (ROW_NUMBER 窗口函數)
- 關聯資料查詢 (物料/不良/HOLD/JOB)
- TMTT 成品流水號對應查詢

設備時段查詢功能:
- 設備狀態時數統計
- 批次清單查詢
- 物料消耗彙總
- 不良統計
- JOB 紀錄查詢

技術改進:
- 新增 read_sql_df_slow() 支援慢查詢專用連線和超時控制
- 修正時區處理使用 TW_TIMEZONE (GMT+8)
- 新增 15 個 SQL 查詢檔案
- 完整的單元測試和 API 測試

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
beabigegg
2026-02-05 19:42:54 +08:00
parent dd520641d1
commit 94e5d8c5c3
30 changed files with 7269 additions and 11 deletions

2
.gitignore vendored
View File

@@ -41,7 +41,7 @@ logs/
.claude/
.codex/
CLAUDE.md
openspec/
# Test artifacts
.pytest_cache/
.coverage

View File

@@ -44,6 +44,11 @@
"route": "/job-query",
"name": "設備維修查詢",
"status": "released"
},
{
"route": "/query-tool",
"name": "批次追蹤工具",
"status": "released"
}
],
"api_public": true,

View File

@@ -506,6 +506,7 @@ logs = store.query_logs(
| hold | `/api/hold` | `hold_routes.py` |
| resource_history | `/api/resource-history` | `resource_history_routes.py` |
| job_query | `/api/job-query` | `job_query_routes.py` |
| query_tool | `/api/query-tool` | `query_tool_routes.py` |
| admin | `/admin` | `admin_routes.py` |
| auth | `/admin` | `auth_routes.py` |
| health | `/` | `health_routes.py` |
@@ -913,6 +914,155 @@ def read_sql_df(sql, params):
---
## 23. 批次追蹤工具 (Query Tool)
### 位置
- 服務: `mes_dashboard.services.query_tool_service`
- 路由: `mes_dashboard.routes.query_tool_routes`
- SQL: `mes_dashboard/sql/query_tool/`
### 功能概述
提供批次追蹤和設備時段查詢功能:
- **批次追蹤**: LOT ID / 流水號 / 工單 → 生產歷程 → 前後批 → 關聯資料
- **設備時段查詢**: 設備狀態時數、批次清單、物料消耗、不良統計、JOB 紀錄
### 核心概念CONTAINERID vs CONTAINERNAME
| 欄位 | 說明 | 用途 |
|------|------|------|
| CONTAINERID | 16 位 hex 內部 ID (例: `488103800029578b`) | 資料庫主鍵,所有歷史表使用 |
| CONTAINERNAME | 用戶可見 LOT ID (例: `GA23100020-A00-011`) | 前端顯示 |
歷史資料表LOTWIPHISTORY、LOTMATERIALSHISTORY 等)使用 CONTAINERID 作為主鍵。
查詢時需 JOIN DW_MES_CONTAINER 取得 CONTAINERNAME 顯示。
### 設備 ID 關係
```
EQUIPMENTID (歷史表) = RESOURCEID (DW_MES_RESOURCE) = HISTORYID (狀態歷史表)
```
### 輸入限制
| 類型 | 上限 | 說明 |
|------|------|------|
| LOT ID | 50 | 直接輸入批號 |
| 流水號 | 50 | 可能一對多展開 |
| 工單 | 10 | 一工單可展開 100+ 批次 |
| 設備 | 20 | 時段查詢 |
| 日期範圍 | 90 天 | 設備時段查詢 |
### API 端點
| 端點 | 方法 | 用途 |
|------|------|------|
| `/api/query-tool/resolve` | POST | 解析輸入為 CONTAINERID |
| `/api/query-tool/lot-history` | GET | 取得 LOT 生產歷程 |
| `/api/query-tool/adjacent-lots` | GET | 取得前後批 |
| `/api/query-tool/lot-associations` | GET | 取得關聯資料 (物料/不良/HOLD/JOB) |
| `/api/query-tool/equipment-period` | POST | 設備時段查詢 |
| `/api/query-tool/equipment-list` | GET | 取得設備清單 |
| `/api/query-tool/export-csv` | POST | CSV 匯出 |
### SQL 檔案
```
sql/query_tool/
├── lot_resolve_id.sql # LOT ID → CONTAINERID
├── lot_resolve_sn.sql # 流水號 → CONTAINERID
├── lot_resolve_wo.sql # 工單 → CONTAINERID
├── lot_history.sql # LOT 生產歷程
├── adjacent_lots.sql # 前後批查詢 (ROW_NUMBER)
├── lot_materials.sql # LOT 物料消耗
├── lot_rejects.sql # LOT 不良紀錄
├── lot_holds.sql # LOT HOLD 紀錄
├── lot_jobs.sql # LOT 相關 JOB
├── lot_splits.sql # TMTT 成品流水號對應
├── lot_split_merge_history.sql # 完整拆併批歷史 (DW_MES_HM_LOTMOVEOUT)
├── equipment_status_hours.sql # 設備狀態時數
├── equipment_lots.sql # 設備批次清單
├── equipment_materials.sql # 設備物料消耗彙總
├── equipment_rejects.sql # 設備不良統計
└── equipment_jobs.sql # 設備 JOB 紀錄
```
### 拆併批歷史追蹤
資料來源:`DW_MES_HM_LOTMOVEOUT` (批次出站事件歷史表)
篩選條件:
- `CDONAME IN ('SplitLot', 'CombineLot')`
操作類型 (CALLBYCDONAME)
| 值 | 說明 |
|---|------|
| AssemblyMotherLotSchePrep | 母批排程拆併批 |
| LotSplit | 標準拆批 |
| PJ_TMTTCombine | TMTT 併批 |
關鍵欄位:
| 欄位 | 說明 |
|------|------|
| CONTAINERID / CONTAINERNAME | 目標批次 |
| FROMCONTAINERID / FROMCONTAINERNAME | 來源批次 |
| FROMQTY / QTY | 操作前後數量 |
| TXNDATE | 交易時間 |
| WORKCENTER | 站點 |
| EMPLOYEENAME | 操作人員 |
### 前後批查詢邏輯
使用 ROW_NUMBER() 窗口函數計算相對位置:
```sql
WITH ranked_lots AS (
SELECT
h.*,
ROW_NUMBER() OVER (
PARTITION BY h.EQUIPMENTID, h.SPECNAME
ORDER BY h.TRACKINTIMESTAMP
) AS rn
FROM DWH.DW_MES_LOTWIPHISTORY h
WHERE h.EQUIPMENTID = :equipment_id
AND h.SPECNAME = :spec_name
AND h.TRACKINTIMESTAMP BETWEEN :time_start AND :time_end
)
SELECT *, r.rn - t.target_rn AS relative_position
FROM ranked_lots r, target_lot t
WHERE r.rn BETWEEN (t.target_rn - 3) AND (t.target_rn + 3)
```
### 服務函數
```python
from mes_dashboard.services.query_tool_service import (
# LOT 解析
resolve_lots, # 統一入口
# LOT 查詢
get_lot_history, # 生產歷程
get_adjacent_lots, # 前後批
get_lot_materials, # 物料消耗
get_lot_rejects, # 不良紀錄
get_lot_holds, # HOLD 紀錄
get_lot_jobs, # JOB 紀錄
# 設備查詢
get_equipment_status_hours, # 狀態時數
get_equipment_lots, # 批次清單
get_equipment_materials, # 物料消耗
get_equipment_rejects, # 不良統計
get_equipment_jobs, # JOB 紀錄
# 驗證
validate_date_range,
validate_lot_input,
validate_equipment_input,
)
```
---
## 參考檔案索引
| 功能 | 檔案位置 |
@@ -932,5 +1082,6 @@ def read_sql_df(sql, params):
| 頁面狀態 | `src/mes_dashboard/services/page_registry.py` |
| Filter 快取 | `src/mes_dashboard/services/filter_cache.py` |
| 資源快取 | `src/mes_dashboard/services/resource_cache.py` |
| 批次追蹤服務 | `src/mes_dashboard/services/query_tool_service.py` |
| API 客戶端 | `src/mes_dashboard/static/js/mes-api.js` |
| Toast 系統 | `src/mes_dashboard/static/js/toast.js` |

View File

@@ -208,11 +208,14 @@ def dispose_engine():
# ============================================================
def get_db_connection():
def get_db_connection(call_timeout_ms: int = 55000):
"""Create a direct oracledb connection.
Used for operations that need direct cursor access.
Includes call_timeout to prevent long-running queries from blocking workers.
Args:
call_timeout_ms: Query timeout in milliseconds (default 55000 = 55s)
"""
try:
conn = oracledb.connect(
@@ -221,10 +224,10 @@ def get_db_connection():
retry_count=1, # Retry once on connection failure
retry_delay=1, # 1s delay between retries
)
# Set call timeout to 55 seconds (must be less than Gunicorn's 60s worker timeout)
# Set call timeout (must be less than Gunicorn's 60s worker timeout)
# This prevents queries from blocking workers indefinitely
conn.call_timeout = 55000 # milliseconds
logger.debug("Direct oracledb connection established (call_timeout=55s)")
conn.call_timeout = call_timeout_ms
logger.debug(f"Direct oracledb connection established (call_timeout={call_timeout_ms}ms)")
return conn
except Exception as exc:
ora_code = _extract_ora_code(exc)
@@ -282,8 +285,12 @@ def read_sql_df(sql: str, params: Optional[Dict[str, Any]] = None) -> pd.DataFra
try:
with engine.connect() as conn:
df = pd.read_sql(text(sql), conn, params=params)
df.columns = [str(c).upper() for c in df.columns]
# SQLAlchemy 2.0: execute text() with params dict, then convert to DataFrame
stmt = text(sql)
result = conn.execute(stmt, params or {})
rows = result.fetchall()
columns = [str(c).upper() for c in result.keys()]
df = pd.DataFrame(rows, columns=columns)
elapsed = time.time() - start_time
@@ -322,6 +329,110 @@ def read_sql_df(sql: str, params: Optional[Dict[str, Any]] = None) -> pd.DataFra
raise
def read_sql_df_slow(
sql: str,
params: Optional[Dict[str, Any]] = None,
timeout_seconds: int = 90
) -> pd.DataFrame:
"""Execute slow SQL query with dedicated connection and timeout.
This function is designed for known slow queries that should not:
- Block the connection pool (uses direct connection)
- Affect circuit breaker state (timeouts are expected)
Args:
sql: SQL query string with Oracle bind variables (:param_name)
params: Optional dict of parameter values to bind
timeout_seconds: Query timeout in seconds (default 90s)
Returns:
DataFrame with query results. Column names are uppercased.
Raises:
TimeoutError: If query exceeds timeout (ORA-01013)
RuntimeError: If connection fails
Exception: Other database errors
Example:
>>> sql = "SELECT * FROM large_table WHERE id = :id"
>>> df = read_sql_df_slow(sql, {"id": "123"}, timeout_seconds=60)
"""
from mes_dashboard.core.metrics import record_query_latency
start_time = time.time()
# Use dedicated connection with custom timeout (not from pool)
conn = get_db_connection(call_timeout_ms=timeout_seconds * 1000)
if conn is None:
raise RuntimeError("Failed to establish database connection for slow query")
try:
cursor = conn.cursor()
# Execute with bind parameters
if params:
cursor.execute(sql, params)
else:
cursor.execute(sql)
# Fetch results
columns = [desc[0].upper() for desc in cursor.description]
rows = cursor.fetchall()
# Convert to DataFrame
df = pd.DataFrame(rows, columns=columns)
elapsed = time.time() - start_time
# Record metrics (but not to circuit breaker - slow queries are expected)
record_query_latency(elapsed)
logger.info(f"Slow query completed in {elapsed:.2f}s, rows={len(df)}")
return df
except oracledb.DatabaseError as exc:
elapsed = time.time() - start_time
record_query_latency(elapsed)
error_obj = exc.args[0] if exc.args else None
ora_code = getattr(error_obj, 'code', None) or _extract_ora_code(exc)
sql_preview = sql.strip().replace('\n', ' ')[:100]
# ORA-01013: user requested cancel of current operation (timeout)
if ora_code == 1013 or str(ora_code) == '1013':
logger.warning(
f"Slow query timed out after {elapsed:.2f}s "
f"(limit: {timeout_seconds}s) | SQL: {sql_preview}..."
)
raise TimeoutError(
f"Query timed out after {timeout_seconds} seconds"
) from exc
else:
logger.error(
f"Slow query failed after {elapsed:.2f}s - ORA-{ora_code}: {exc} | "
f"SQL: {sql_preview}..."
)
raise
except Exception as exc:
elapsed = time.time() - start_time
record_query_latency(elapsed)
sql_preview = sql.strip().replace('\n', ' ')[:100]
logger.error(
f"Slow query failed after {elapsed:.2f}s: {exc} | SQL: {sql_preview}..."
)
raise
finally:
try:
conn.close()
logger.debug("Slow query connection closed")
except Exception:
pass
# ============================================================
# Table Utilities
# ============================================================

View File

@@ -13,6 +13,7 @@ from .auth_routes import auth_bp
from .admin_routes import admin_bp
from .resource_history_routes import resource_history_bp
from .job_query_routes import job_query_bp
from .query_tool_routes import query_tool_bp
def register_routes(app) -> None:
@@ -24,6 +25,7 @@ def register_routes(app) -> None:
app.register_blueprint(hold_bp)
app.register_blueprint(resource_history_bp)
app.register_blueprint(job_query_bp)
app.register_blueprint(query_tool_bp)
__all__ = [
'wip_bp',
@@ -35,5 +37,6 @@ __all__ = [
'admin_bp',
'resource_history_bp',
'job_query_bp',
'query_tool_bp',
'register_routes',
]

View File

@@ -0,0 +1,473 @@
# -*- coding: utf-8 -*-
"""Query Tool API routes.
Contains Flask Blueprint for batch tracing and equipment period query endpoints:
- LOT resolution (LOT ID / Serial Number / Work Order → CONTAINERID)
- LOT production history and adjacent lots
- LOT associations (materials, rejects, holds, jobs)
- Equipment period queries (status hours, lots, materials, rejects, jobs)
- CSV export functionality
"""
from flask import Blueprint, jsonify, request, Response, render_template
from mes_dashboard.services.query_tool_service import (
resolve_lots,
get_lot_history,
get_adjacent_lots,
get_lot_materials,
get_lot_rejects,
get_lot_holds,
get_lot_splits,
get_lot_jobs,
get_equipment_status_hours,
get_equipment_lots,
get_equipment_materials,
get_equipment_rejects,
get_equipment_jobs,
export_to_csv,
generate_csv_stream,
validate_date_range,
validate_lot_input,
validate_equipment_input,
)
# Create Blueprint
query_tool_bp = Blueprint('query_tool', __name__)
# ============================================================
# Page Route
# ============================================================
@query_tool_bp.route('/query-tool')
def query_tool_page():
"""Render the query tool page."""
return render_template('query_tool.html')
# ============================================================
# LOT Resolution API
# ============================================================
@query_tool_bp.route('/api/query-tool/resolve', methods=['POST'])
def resolve_lot_input():
"""Resolve user input to CONTAINERID list.
Expects JSON body:
{
"input_type": "lot_id" | "serial_number" | "work_order",
"values": ["value1", "value2", ...]
}
Returns:
{
"data": [{"container_id": "...", "input_value": "..."}, ...],
"total": 10,
"input_count": 5,
"not_found": ["value3"]
}
"""
data = request.get_json()
if not data:
return jsonify({'error': '請求內容不可為空'}), 400
input_type = data.get('input_type')
values = data.get('values', [])
# Validate input type
valid_types = ['lot_id', 'serial_number', 'work_order']
if input_type not in valid_types:
return jsonify({'error': f'不支援的查詢類型: {input_type}'}), 400
# Validate values
validation_error = validate_lot_input(input_type, values)
if validation_error:
return jsonify({'error': validation_error}), 400
result = resolve_lots(input_type, values)
if 'error' in result:
return jsonify(result), 400
return jsonify(result)
# ============================================================
# LOT History API
# ============================================================
@query_tool_bp.route('/api/query-tool/lot-history', methods=['GET'])
def query_lot_history():
"""Query production history for a LOT.
Query params:
container_id: CONTAINERID (16-char hex)
Returns production history records.
"""
container_id = request.args.get('container_id')
if not container_id:
return jsonify({'error': '請指定 CONTAINERID'}), 400
result = get_lot_history(container_id)
if 'error' in result:
return jsonify(result), 400
return jsonify(result)
# ============================================================
# Adjacent Lots API
# ============================================================
@query_tool_bp.route('/api/query-tool/adjacent-lots', methods=['GET'])
def query_adjacent_lots():
"""Query adjacent lots (前後批) for a specific equipment and spec.
Query params:
equipment_id: Equipment ID
spec_name: Spec name
target_time: Target lot's TRACKINTIMESTAMP (ISO format)
time_window: Time window in hours (optional, default 24)
Returns adjacent lots with relative position.
"""
equipment_id = request.args.get('equipment_id')
spec_name = request.args.get('spec_name')
target_time = request.args.get('target_time')
time_window = request.args.get('time_window', 24, type=int)
if not all([equipment_id, spec_name, target_time]):
return jsonify({'error': '請指定設備、規格和目標時間'}), 400
result = get_adjacent_lots(equipment_id, spec_name, target_time, time_window)
if 'error' in result:
return jsonify(result), 400
return jsonify(result)
# ============================================================
# LOT Associations API
# ============================================================
@query_tool_bp.route('/api/query-tool/lot-associations', methods=['GET'])
def query_lot_associations():
"""Query association data for a LOT.
Query params:
container_id: CONTAINERID (16-char hex)
type: Association type ('materials', 'rejects', 'holds', 'jobs')
equipment_id: Equipment ID (required for 'jobs' type)
time_start: Start time (required for 'jobs' type)
time_end: End time (required for 'jobs' type)
Returns association records based on type.
"""
container_id = request.args.get('container_id')
assoc_type = request.args.get('type')
if not container_id:
return jsonify({'error': '請指定 CONTAINERID'}), 400
valid_types = ['materials', 'rejects', 'holds', 'splits', 'jobs']
if assoc_type not in valid_types:
return jsonify({'error': f'不支援的關聯類型: {assoc_type}'}), 400
if assoc_type == 'materials':
result = get_lot_materials(container_id)
elif assoc_type == 'rejects':
result = get_lot_rejects(container_id)
elif assoc_type == 'holds':
result = get_lot_holds(container_id)
elif assoc_type == 'splits':
result = get_lot_splits(container_id)
elif assoc_type == 'jobs':
equipment_id = request.args.get('equipment_id')
time_start = request.args.get('time_start')
time_end = request.args.get('time_end')
if not all([equipment_id, time_start, time_end]):
return jsonify({'error': '查詢 JOB 需指定設備和時間範圍'}), 400
result = get_lot_jobs(equipment_id, time_start, time_end)
if 'error' in result:
return jsonify(result), 400
return jsonify(result)
# ============================================================
# Equipment Period Query API
# ============================================================
@query_tool_bp.route('/api/query-tool/equipment-period', methods=['POST'])
def query_equipment_period():
"""Query equipment data for a time period.
Expects JSON body:
{
"equipment_ids": ["id1", "id2", ...],
"equipment_names": ["name1", "name2", ...],
"start_date": "2024-01-01",
"end_date": "2024-01-31",
"query_type": "status_hours" | "lots" | "materials" | "rejects" | "jobs"
}
Returns data based on query_type.
"""
data = request.get_json()
if not data:
return jsonify({'error': '請求內容不可為空'}), 400
equipment_ids = data.get('equipment_ids', [])
equipment_names = data.get('equipment_names', [])
start_date = data.get('start_date')
end_date = data.get('end_date')
query_type = data.get('query_type')
# Validate date range
if not start_date or not end_date:
return jsonify({'error': '請指定日期範圍'}), 400
validation_error = validate_date_range(start_date, end_date)
if validation_error:
return jsonify({'error': validation_error}), 400
# Validate query type
valid_types = ['status_hours', 'lots', 'materials', 'rejects', 'jobs']
if query_type not in valid_types:
return jsonify({'error': f'不支援的查詢類型: {query_type}'}), 400
# Execute query based on type
if query_type == 'status_hours':
if not equipment_ids:
return jsonify({'error': '請選擇至少一台設備'}), 400
result = get_equipment_status_hours(equipment_ids, start_date, end_date)
elif query_type == 'lots':
if not equipment_ids:
return jsonify({'error': '請選擇至少一台設備'}), 400
result = get_equipment_lots(equipment_ids, start_date, end_date)
elif query_type == 'materials':
if not equipment_names:
return jsonify({'error': '請選擇至少一台設備'}), 400
result = get_equipment_materials(equipment_names, start_date, end_date)
elif query_type == 'rejects':
if not equipment_names:
return jsonify({'error': '請選擇至少一台設備'}), 400
result = get_equipment_rejects(equipment_names, start_date, end_date)
elif query_type == 'jobs':
if not equipment_ids:
return jsonify({'error': '請選擇至少一台設備'}), 400
result = get_equipment_jobs(equipment_ids, start_date, end_date)
if 'error' in result:
return jsonify(result), 400
return jsonify(result)
# ============================================================
# Equipment List API (for selection UI)
# ============================================================
@query_tool_bp.route('/api/query-tool/equipment-list', methods=['GET'])
def get_equipment_list():
"""Get available equipment for selection.
Returns equipment from cache for equipment selection UI.
"""
from mes_dashboard.services.resource_cache import get_all_resources
try:
resources = get_all_resources()
if not resources:
return jsonify({'error': '無法載入設備資料'}), 500
# Return minimal data for selection UI
data = []
for r in resources:
data.append({
'RESOURCEID': r.get('RESOURCEID'),
'RESOURCENAME': r.get('RESOURCENAME'),
'WORKCENTERNAME': r.get('WORKCENTERNAME'),
'RESOURCEFAMILYNAME': r.get('RESOURCEFAMILYNAME'),
})
# Sort by WORKCENTERNAME, then RESOURCENAME
data.sort(key=lambda x: (x.get('WORKCENTERNAME', ''), x.get('RESOURCENAME', '')))
return jsonify({
'data': data,
'total': len(data)
})
except Exception as exc:
return jsonify({'error': f'載入設備資料失敗: {str(exc)}'}), 500
# ============================================================
# CSV Export API
# ============================================================
@query_tool_bp.route('/api/query-tool/export-csv', methods=['POST'])
def export_csv():
"""Export query results as CSV.
Expects JSON body:
{
"export_type": "lot_history" | "adjacent_lots" | "lot_materials" |
"lot_rejects" | "lot_holds" | "lot_jobs" |
"equipment_status_hours" | "equipment_lots" |
"equipment_materials" | "equipment_rejects" | "equipment_jobs",
"params": { ... query parameters ... }
}
Returns streaming CSV response.
"""
data = request.get_json()
if not data:
return jsonify({'error': '請求內容不可為空'}), 400
export_type = data.get('export_type')
params = data.get('params', {})
# Get data based on export type
result = None
filename = 'export.csv'
try:
if export_type == 'lot_history':
container_id = params.get('container_id')
if not container_id:
return jsonify({'error': '請指定 CONTAINERID'}), 400
result = get_lot_history(container_id)
filename = f'lot_history_{container_id}.csv'
elif export_type == 'adjacent_lots':
result = get_adjacent_lots(
params.get('equipment_id'),
params.get('spec_name'),
params.get('target_time'),
params.get('time_window', 24)
)
filename = 'adjacent_lots.csv'
elif export_type == 'lot_materials':
container_id = params.get('container_id')
result = get_lot_materials(container_id)
filename = f'lot_materials_{container_id}.csv'
elif export_type == 'lot_rejects':
container_id = params.get('container_id')
result = get_lot_rejects(container_id)
filename = f'lot_rejects_{container_id}.csv'
elif export_type == 'lot_holds':
container_id = params.get('container_id')
result = get_lot_holds(container_id)
filename = f'lot_holds_{container_id}.csv'
elif export_type == 'lot_splits':
container_id = params.get('container_id')
result = get_lot_splits(container_id)
# Flatten nested structure for CSV
if result and 'data' in result:
flat_data = []
for item in result['data']:
serial_number = item.get('serial_number', '')
txn_date = item.get('txn_date', '')
for lot in item.get('lots', []):
flat_data.append({
'成品流水號': serial_number,
'LOT ID': lot.get('lot_id', ''),
'規格': lot.get('spec_name', ''),
'數量': lot.get('qty', ''),
'合併序號': lot.get('combine_seq', ''),
'交易時間': txn_date,
})
result['data'] = flat_data
filename = f'lot_splits_{container_id}.csv'
elif export_type == 'lot_jobs':
result = get_lot_jobs(
params.get('equipment_id'),
params.get('time_start'),
params.get('time_end')
)
filename = 'lot_jobs.csv'
elif export_type == 'equipment_status_hours':
result = get_equipment_status_hours(
params.get('equipment_ids', []),
params.get('start_date'),
params.get('end_date')
)
filename = 'equipment_status_hours.csv'
elif export_type == 'equipment_lots':
result = get_equipment_lots(
params.get('equipment_ids', []),
params.get('start_date'),
params.get('end_date')
)
filename = 'equipment_lots.csv'
elif export_type == 'equipment_materials':
result = get_equipment_materials(
params.get('equipment_names', []),
params.get('start_date'),
params.get('end_date')
)
filename = 'equipment_materials.csv'
elif export_type == 'equipment_rejects':
result = get_equipment_rejects(
params.get('equipment_names', []),
params.get('start_date'),
params.get('end_date')
)
filename = 'equipment_rejects.csv'
elif export_type == 'equipment_jobs':
result = get_equipment_jobs(
params.get('equipment_ids', []),
params.get('start_date'),
params.get('end_date')
)
filename = 'equipment_jobs.csv'
else:
return jsonify({'error': f'不支援的匯出類型: {export_type}'}), 400
if result is None or 'error' in result:
error_msg = result.get('error', '查詢失敗') if result else '查詢失敗'
return jsonify({'error': error_msg}), 400
export_data = result.get('data', [])
if not export_data:
return jsonify({'error': '查無資料'}), 404
# Stream CSV response
return Response(
generate_csv_stream(export_data),
mimetype='text/csv; charset=utf-8',
headers={
'Content-Disposition': f'attachment; filename={filename}'
}
)
except Exception as exc:
return jsonify({'error': f'匯出失敗: {str(exc)}'}), 500

File diff suppressed because it is too large Load Diff

View File

@@ -9,7 +9,10 @@ import json
import logging
import threading
import time
from datetime import datetime
from datetime import datetime, timezone, timedelta
# Taiwan timezone (GMT+8)
TW_TIMEZONE = timezone(timedelta(hours=8))
from typing import Any, Dict, List, Optional, Tuple
from mes_dashboard.core.database import read_sql_df
@@ -312,7 +315,7 @@ def _save_to_redis(aggregated: List[Dict[str, Any]]) -> bool:
# Serialize data
data_json = json.dumps(aggregated, ensure_ascii=False, default=str)
updated_at = datetime.now().isoformat()
updated_at = datetime.now(TW_TIMEZONE).isoformat()
count = len(aggregated)
# Atomic update using pipeline

View File

@@ -13,7 +13,10 @@ import logging
import os
import threading
import time
from datetime import datetime
from datetime import datetime, timezone, timedelta
# Taiwan timezone (GMT+8)
TW_TIMEZONE = timezone(timedelta(hours=8))
from typing import Any, Dict, List, Optional, Tuple
import pandas as pd
@@ -211,7 +214,7 @@ def _sync_to_redis(df: pd.DataFrame, version: str) -> bool:
data_json = df_copy.to_json(orient='records', force_ascii=False)
# Atomic update using pipeline
now = datetime.now().isoformat()
now = datetime.now(TW_TIMEZONE).isoformat()
pipe = client.pipeline()
pipe.set(_get_key("data"), data_json)
pipe.set(_get_key("meta:version"), version)

View File

@@ -0,0 +1,66 @@
-- Adjacent Lots Query (前後批查詢)
-- Finds lots processed before and after a target lot on same equipment with same spec
--
-- Parameters:
-- :equipment_id - Target equipment ID
-- :spec_name - Target spec name
-- :target_trackin_time - Target lot's TRACKINTIMESTAMP
-- :time_window_hours - Time window in hours (default 24)
--
-- Note: Uses ROW_NUMBER() to identify relative position
-- Limited to ±time_window_hours to control result set
WITH time_bounds AS (
SELECT
:target_trackin_time - INTERVAL '1' HOUR * :time_window_hours AS time_start,
:target_trackin_time + INTERVAL '1' HOUR * :time_window_hours AS time_end
FROM DUAL
),
ranked_lots AS (
SELECT
h.CONTAINERID,
h.EQUIPMENTID,
h.EQUIPMENTNAME,
h.SPECNAME,
h.PJ_TYPE,
h.TRACKINTIMESTAMP,
h.TRACKOUTTIMESTAMP,
h.TRACKINQTY,
h.TRACKOUTQTY,
h.FINISHEDRUNCARD,
h.PJ_WORKORDER,
c.CONTAINERNAME,
ROW_NUMBER() OVER (
PARTITION BY h.EQUIPMENTID, h.SPECNAME
ORDER BY h.TRACKINTIMESTAMP
) AS rn
FROM DWH.DW_MES_LOTWIPHISTORY h
LEFT JOIN DWH.DW_MES_CONTAINER c ON h.CONTAINERID = c.CONTAINERID
CROSS JOIN time_bounds tb
WHERE h.EQUIPMENTID = :equipment_id
AND h.SPECNAME = :spec_name
AND h.TRACKINTIMESTAMP BETWEEN tb.time_start AND tb.time_end
),
target_lot AS (
SELECT rn AS target_rn
FROM ranked_lots
WHERE TRACKINTIMESTAMP = :target_trackin_time
)
SELECT
r.CONTAINERID,
r.EQUIPMENTID,
r.EQUIPMENTNAME,
r.SPECNAME,
r.PJ_TYPE,
r.TRACKINTIMESTAMP,
r.TRACKOUTTIMESTAMP,
r.TRACKINQTY,
r.TRACKOUTQTY,
r.FINISHEDRUNCARD,
r.PJ_WORKORDER,
r.CONTAINERNAME,
r.rn - t.target_rn AS RELATIVE_POSITION
FROM ranked_lots r
CROSS JOIN target_lot t
WHERE r.rn BETWEEN (t.target_rn - 3) AND (t.target_rn + 3)
ORDER BY r.rn

View File

@@ -0,0 +1,33 @@
-- Equipment JOB Records Query
-- Retrieves JOB records for equipment in a time period
--
-- Parameters:
-- :start_date - Start date (YYYY-MM-DD)
-- :end_date - End date (YYYY-MM-DD)
--
-- Dynamic placeholders:
-- EQUIPMENT_FILTER - Equipment filter condition (on RESOURCEID)
--
-- Note: DW_MES_JOB uses RESOURCEID/RESOURCENAME
-- EQUIPMENTID = RESOURCEID (same ID system)
-- Uses CREATEDATE for date filtering
SELECT
JOBID,
RESOURCEID,
RESOURCENAME,
JOBSTATUS,
JOBMODELNAME,
JOBORDERNAME,
CREATEDATE,
COMPLETEDATE,
CAUSECODENAME,
REPAIRCODENAME,
SYMPTOMCODENAME,
CONTAINERIDS,
CONTAINERNAMES
FROM DWH.DW_MES_JOB
WHERE CREATEDATE >= TO_DATE(:start_date, 'YYYY-MM-DD')
AND CREATEDATE < TO_DATE(:end_date, 'YYYY-MM-DD') + 1
AND {{ EQUIPMENT_FILTER }}
ORDER BY RESOURCENAME, CREATEDATE DESC

View File

@@ -0,0 +1,31 @@
-- Equipment Lots List Query
-- Retrieves all lots processed by equipment in a time period
--
-- Parameters:
-- :start_date - Start date (YYYY-MM-DD)
-- :end_date - End date (YYYY-MM-DD)
--
-- Dynamic placeholders:
-- EQUIPMENT_FILTER - Equipment filter condition (on EQUIPMENTID)
--
-- Note: Uses EQUIPMENTID/EQUIPMENTNAME (NOT RESOURCEID/RESOURCENAME)
-- JOIN CONTAINER to get CONTAINERNAME (LOT ID)
SELECT
h.CONTAINERID,
c.CONTAINERNAME,
h.EQUIPMENTID,
h.EQUIPMENTNAME,
h.FINISHEDRUNCARD,
h.SPECNAME,
h.TRACKINTIMESTAMP,
h.TRACKOUTTIMESTAMP,
h.TRACKINQTY,
h.TRACKOUTQTY,
h.PJ_WORKORDER
FROM DWH.DW_MES_LOTWIPHISTORY h
LEFT JOIN DWH.DW_MES_CONTAINER c ON h.CONTAINERID = c.CONTAINERID
WHERE h.TRACKINTIMESTAMP >= TO_DATE(:start_date, 'YYYY-MM-DD')
AND h.TRACKINTIMESTAMP < TO_DATE(:end_date, 'YYYY-MM-DD') + 1
AND {{ EQUIPMENT_FILTER }}
ORDER BY h.EQUIPMENTNAME, h.TRACKINTIMESTAMP

View File

@@ -0,0 +1,25 @@
-- Equipment Materials Consumption Summary
-- Aggregates material consumption by equipment for a time period
--
-- Parameters:
-- :start_date - Start date (YYYY-MM-DD)
-- :end_date - End date (YYYY-MM-DD)
--
-- Dynamic placeholders:
-- EQUIPMENT_FILTER - Equipment filter condition (on EQUIPMENTNAME)
--
-- Note: Uses MATERIALPARTNAME (NOT MATERIALNAME)
-- Uses QTYCONSUMED (NOT CONSUMEQTY)
-- Uses TXNDATE (NOT TXNDATETIME)
SELECT
EQUIPMENTNAME,
MATERIALPARTNAME,
SUM(QTYCONSUMED) AS TOTAL_CONSUMED,
COUNT(DISTINCT CONTAINERID) AS LOT_COUNT
FROM DWH.DW_MES_LOTMATERIALSHISTORY
WHERE TXNDATE >= TO_DATE(:start_date, 'YYYY-MM-DD')
AND TXNDATE < TO_DATE(:end_date, 'YYYY-MM-DD') + 1
AND {{ EQUIPMENT_FILTER }}
GROUP BY EQUIPMENTNAME, MATERIALPARTNAME
ORDER BY EQUIPMENTNAME, TOTAL_CONSUMED DESC

View File

@@ -0,0 +1,27 @@
-- Equipment Reject Statistics
-- Aggregates reject statistics by equipment for a time period
--
-- Parameters:
-- :start_date - Start date (YYYY-MM-DD)
-- :end_date - End date (YYYY-MM-DD)
--
-- Dynamic placeholders:
-- EQUIPMENT_FILTER - Equipment filter condition (on EQUIPMENTNAME)
--
-- Note: LOTREJECTHISTORY only has EQUIPMENTNAME, NO EQUIPMENTID
-- If need to filter by EQUIPMENTID, must JOIN LOTWIPHISTORY
-- Uses LOSSREASONNAME (NOT REJECTREASONNAME)
-- Uses TXNDATE (NOT TXNDATETIME)
SELECT
EQUIPMENTNAME,
REJECTCATEGORYNAME,
LOSSREASONNAME,
SUM(REJECTQTY) AS TOTAL_REJECT_QTY,
COUNT(DISTINCT CONTAINERID) AS AFFECTED_LOT_COUNT
FROM DWH.DW_MES_LOTREJECTHISTORY
WHERE TXNDATE >= TO_DATE(:start_date, 'YYYY-MM-DD')
AND TXNDATE < TO_DATE(:end_date, 'YYYY-MM-DD') + 1
AND {{ EQUIPMENT_FILTER }}
GROUP BY EQUIPMENTNAME, REJECTCATEGORYNAME, LOSSREASONNAME
ORDER BY EQUIPMENTNAME, TOTAL_REJECT_QTY DESC

View File

@@ -0,0 +1,41 @@
-- Equipment Status Hours Query
-- Aggregates status hours by equipment for a time period
--
-- Parameters:
-- :start_date - Start date (YYYY-MM-DD)
-- :end_date - End date (YYYY-MM-DD)
--
-- Dynamic placeholders:
-- EQUIPMENT_FILTER - Equipment filter condition
--
-- Note: RESOURCESTATUS_SHIFT requires JOIN with RESOURCE to get RESOURCENAME
-- Uses HISTORYID = RESOURCEID relationship
-- Uses OLDSTATUSNAME/NEWSTATUSNAME (NOT STATUSNAME)
-- Uses TXNDATE (NOT SHIFTDATE)
-- Hours field: HOURS
SELECT
r.RESOURCEID,
r.RESOURCENAME,
SUM(CASE WHEN s.NEWSTATUSNAME = 'PRD' THEN s.HOURS ELSE 0 END) AS PRD_HOURS,
SUM(CASE WHEN s.NEWSTATUSNAME = 'SBY' THEN s.HOURS ELSE 0 END) AS SBY_HOURS,
SUM(CASE WHEN s.NEWSTATUSNAME = 'UDT' THEN s.HOURS ELSE 0 END) AS UDT_HOURS,
SUM(CASE WHEN s.NEWSTATUSNAME = 'SDT' THEN s.HOURS ELSE 0 END) AS SDT_HOURS,
SUM(CASE WHEN s.NEWSTATUSNAME = 'EGT' THEN s.HOURS ELSE 0 END) AS EGT_HOURS,
SUM(CASE WHEN s.NEWSTATUSNAME = 'NST' THEN s.HOURS ELSE 0 END) AS NST_HOURS,
SUM(s.HOURS) AS TOTAL_HOURS,
ROUND(
SUM(CASE WHEN s.NEWSTATUSNAME = 'PRD' THEN s.HOURS ELSE 0 END) * 100.0 /
NULLIF(
SUM(CASE WHEN s.NEWSTATUSNAME IN ('PRD', 'SBY', 'UDT') THEN s.HOURS ELSE 0 END),
0
),
2
) AS OU_PERCENT
FROM DWH.DW_MES_RESOURCESTATUS_SHIFT s
JOIN DWH.DW_MES_RESOURCE r ON s.HISTORYID = r.RESOURCEID
WHERE s.TXNDATE >= TO_DATE(:start_date, 'YYYY-MM-DD')
AND s.TXNDATE < TO_DATE(:end_date, 'YYYY-MM-DD') + 1
AND {{ EQUIPMENT_FILTER }}
GROUP BY r.RESOURCEID, r.RESOURCENAME
ORDER BY r.RESOURCENAME

View File

@@ -0,0 +1,52 @@
-- LOT Production History Query
-- Retrieves complete production history for a LOT
--
-- Parameters:
-- :container_id - CONTAINERID to query (16-char hex)
--
-- Note: Uses EQUIPMENTID/EQUIPMENTNAME (NOT RESOURCEID/RESOURCENAME)
-- Time fields: TRACKINTIMESTAMP/TRACKOUTTIMESTAMP (NOT TXNDATETIME)
-- Partial track-out: Same LOT may have multiple records with same track-in
-- but different track-out times. We take the latest track-out time.
-- Only includes records with actual equipment (excludes checkpoint stations)
WITH ranked_history AS (
SELECT
h.CONTAINERID,
h.WORKCENTERNAME,
h.EQUIPMENTID,
h.EQUIPMENTNAME,
h.SPECNAME,
h.TRACKINTIMESTAMP,
h.TRACKOUTTIMESTAMP,
h.TRACKINQTY,
h.TRACKOUTQTY,
h.FINISHEDRUNCARD,
h.PJ_WORKORDER,
c.CONTAINERNAME,
ROW_NUMBER() OVER (
PARTITION BY h.CONTAINERID, h.EQUIPMENTID, h.SPECNAME, h.TRACKINTIMESTAMP
ORDER BY h.TRACKOUTTIMESTAMP DESC NULLS LAST
) AS rn
FROM DWH.DW_MES_LOTWIPHISTORY h
LEFT JOIN DWH.DW_MES_CONTAINER c ON h.CONTAINERID = c.CONTAINERID
WHERE h.CONTAINERID = :container_id
AND h.EQUIPMENTID IS NOT NULL
AND h.TRACKINTIMESTAMP IS NOT NULL
)
SELECT
CONTAINERID,
WORKCENTERNAME,
EQUIPMENTID,
EQUIPMENTNAME,
SPECNAME,
TRACKINTIMESTAMP,
TRACKOUTTIMESTAMP,
TRACKINQTY,
TRACKOUTQTY,
FINISHEDRUNCARD,
PJ_WORKORDER,
CONTAINERNAME
FROM ranked_history
WHERE rn = 1
ORDER BY TRACKINTIMESTAMP

View File

@@ -0,0 +1,34 @@
-- LOT HOLD Records Query
-- Retrieves HOLD/RELEASE history for a LOT
--
-- Parameters:
-- :container_id - CONTAINERID to query (16-char hex)
--
-- Note: Uses HOLDTXNDATE/RELEASETXNDATE (NOT TXNDATETIME)
-- NULL RELEASETXNDATE means currently HOLD
SELECT
CONTAINERID,
WORKCENTERNAME,
HOLDTXNDATE,
HOLDEMP,
HOLDEMPDEPTNAME,
HOLDREASONNAME,
HOLDCOMMENTS,
RELEASETXNDATE,
RELEASEEMP,
RELEASECOMMENTS,
NCRID,
CASE
WHEN RELEASETXNDATE IS NULL THEN 'HOLD'
ELSE 'RELEASED'
END AS HOLD_STATUS,
CASE
WHEN RELEASETXNDATE IS NULL THEN
ROUND((SYSDATE - HOLDTXNDATE) * 24, 2)
ELSE
ROUND((RELEASETXNDATE - HOLDTXNDATE) * 24, 2)
END AS HOLD_HOURS
FROM DWH.DW_MES_HOLDRELEASEHISTORY
WHERE CONTAINERID = :container_id
ORDER BY HOLDTXNDATE DESC

View File

@@ -0,0 +1,35 @@
-- LOT Related JOB Records Query
-- Retrieves JOB records for equipment during LOT processing
--
-- Parameters:
-- :equipment_id - Equipment ID (EQUIPMENTID = RESOURCEID in same ID system)
-- :time_start - Start time of LOT processing
-- :time_end - End time of LOT processing
--
-- Note: DW_MES_JOB uses RESOURCEID/RESOURCENAME
-- LOTWIPHISTORY uses EQUIPMENTID/EQUIPMENTNAME
-- EQUIPMENTID = RESOURCEID (same ID system, can JOIN directly)
-- CONTAINERIDS/CONTAINERNAMES are comma-separated strings
SELECT
j.JOBID,
j.RESOURCEID,
j.RESOURCENAME,
j.JOBSTATUS,
j.JOBMODELNAME,
j.JOBORDERNAME,
j.CREATEDATE,
j.COMPLETEDATE,
j.CAUSECODENAME,
j.REPAIRCODENAME,
j.SYMPTOMCODENAME,
j.CONTAINERIDS,
j.CONTAINERNAMES
FROM DWH.DW_MES_JOB j
WHERE j.RESOURCEID = :equipment_id
AND (
(j.CREATEDATE BETWEEN :time_start AND :time_end)
OR (j.COMPLETEDATE BETWEEN :time_start AND :time_end)
OR (j.CREATEDATE <= :time_start AND (j.COMPLETEDATE IS NULL OR j.COMPLETEDATE >= :time_end))
)
ORDER BY j.CREATEDATE

View File

@@ -0,0 +1,21 @@
-- LOT Materials Consumption Query
-- Retrieves material consumption records for a LOT
--
-- Parameters:
-- :container_id - CONTAINERID to query (16-char hex)
--
-- Note: Uses MATERIALPARTNAME (NOT MATERIALNAME)
-- Uses QTYCONSUMED (NOT CONSUMEQTY)
-- Uses TXNDATE (NOT TXNDATETIME)
SELECT
CONTAINERID,
MATERIALPARTNAME,
MATERIALLOTNAME,
QTYCONSUMED,
WORKCENTERNAME,
EQUIPMENTNAME,
TXNDATE
FROM DWH.DW_MES_LOTMATERIALSHISTORY
WHERE CONTAINERID = :container_id
ORDER BY TXNDATE

View File

@@ -0,0 +1,24 @@
-- LOT Reject Records Query
-- Retrieves reject (defect) records for a LOT
--
-- Parameters:
-- :container_id - CONTAINERID to query (16-char hex)
--
-- Note: Uses LOSSREASONNAME (NOT REJECTREASONNAME)
-- Uses TXNDATE (NOT TXNDATETIME)
-- Only has EQUIPMENTNAME, NO EQUIPMENTID field
SELECT
CONTAINERID,
REJECTCATEGORYNAME,
LOSSREASONNAME,
REJECTQTY,
WORKCENTERNAME,
EQUIPMENTNAME,
TXNDATE,
COMMENTS,
REJECTCAUSE,
REJECTCOMMENT
FROM DWH.DW_MES_LOTREJECTHISTORY
WHERE CONTAINERID = :container_id
ORDER BY TXNDATE

View File

@@ -0,0 +1,17 @@
-- LOT ID to CONTAINERID Resolution
-- Converts user-input LOT ID (CONTAINERNAME) to internal CONTAINERID
--
-- Parameters:
-- :container_names - List of CONTAINERNAME values (bind variable list)
--
-- Note: CONTAINERID is 16-char hex (e.g., '48810380001cba48')
-- CONTAINERNAME is user-visible LOT ID (e.g., 'GA23100020-A00-011')
SELECT
CONTAINERID,
CONTAINERNAME,
MFGORDERNAME,
SPECNAME,
QTY
FROM DWH.DW_MES_CONTAINER
WHERE CONTAINERNAME IN ({{ CONTAINER_NAMES }})

View File

@@ -0,0 +1,13 @@
-- Serial Number (流水號) to CONTAINERID Resolution
-- Converts finished product serial number to CONTAINERID list
--
-- Parameters:
-- :finished_names - List of FINISHEDNAME values (bind variable list)
--
-- Note: One FINISHEDNAME may correspond to multiple CONTAINERIDs (2-5 typical)
SELECT DISTINCT
CONTAINERID,
FINISHEDNAME
FROM DWH.DW_MES_PJ_COMBINEDASSYLOTS
WHERE FINISHEDNAME IN ({{ FINISHED_NAMES }})

View File

@@ -0,0 +1,14 @@
-- GA Work Order to CONTAINERID Resolution
-- Expands work order to all associated CONTAINERIDs
--
-- Parameters:
-- :work_orders - List of PJ_WORKORDER values (bind variable list)
--
-- Note: One work order may expand to many CONTAINERIDs (can be 100+)
-- Using LOTWIPHISTORY because PJ_WORKORDER has 100% completeness there
SELECT DISTINCT
CONTAINERID,
PJ_WORKORDER
FROM DWH.DW_MES_LOTWIPHISTORY
WHERE PJ_WORKORDER IN ({{ WORK_ORDERS }})

View File

@@ -0,0 +1,26 @@
-- LOT Split/Merge History Query (拆併批歷史紀錄)
-- Query by CONTAINERID list from same work order
-- Check both TARGET (CONTAINERID) and SOURCE (FROMCONTAINERID) to find all related records
WITH work_order_lots AS (
SELECT CONTAINERID
FROM DWH.DW_MES_CONTAINER
WHERE MFGORDERNAME = :work_order
)
SELECT
h.HISTORYMAINLINEID,
h.CDONAME AS OPERATION_TYPE,
h.CONTAINERID AS TARGET_CONTAINERID,
h.CONTAINERNAME AS TARGET_LOT,
h.FROMCONTAINERID AS SOURCE_CONTAINERID,
h.FROMCONTAINERNAME AS SOURCE_LOT,
h.QTY AS TARGET_QTY,
h.TXNDATE
FROM DWH.DW_MES_HM_LOTMOVEOUT h
WHERE (
h.CONTAINERID IN (SELECT CONTAINERID FROM work_order_lots)
OR h.FROMCONTAINERID IN (SELECT CONTAINERID FROM work_order_lots)
)
AND h.FROMCONTAINERID IS NOT NULL
ORDER BY h.TXNDATE
FETCH FIRST 100 ROWS ONLY

View File

@@ -0,0 +1,30 @@
-- LOT Split/Merge Records (拆併批紀錄)
-- Shows what serial numbers were produced from this LOT
-- and what other LOTs were combined together
--
-- Parameters:
-- :container_id - Target CONTAINERID (16-char hex)
--
-- Returns:
-- - FINISHEDNAME: Serial number produced
-- - Related LOTs that were combined to create each serial number
-- - PJ_COMBINEDRATIO: Contribution ratio (1.0 = 100%)
-- - PJ_GOODDIEQTY: Good die quantity contributed
SELECT
p.FINISHEDNAME,
p.CONTAINERID,
p.CONTAINERNAME AS LOT_ID,
p.PJ_WORKORDER,
p.PJ_COMBINEDRATIO,
p.PJ_GOODDIEQTY,
p.PJ_ORIGINALGOODDIEQTY,
p.ORIGINALSTARTDATE
FROM DWH.DW_MES_PJ_COMBINEDASSYLOTS p
WHERE p.FINISHEDNAME IN (
-- Find all serial numbers that this LOT contributed to
SELECT DISTINCT FINISHEDNAME
FROM DWH.DW_MES_PJ_COMBINEDASSYLOTS
WHERE CONTAINERID = :container_id
)
ORDER BY p.FINISHEDNAME, p.ORIGINALSTARTDATE, p.CONTAINERNAME

File diff suppressed because it is too large Load Diff

View File

@@ -325,6 +325,9 @@
{% if can_view_page('/job-query') %}
<button class="tab" data-target="jobQueryFrame">設備維修查詢</button>
{% endif %}
{% if can_view_page('/query-tool') %}
<button class="tab" data-target="queryToolFrame">批次追蹤工具</button>
{% endif %}
</div>
<div class="panel">
@@ -347,6 +350,9 @@
{% if can_view_page('/job-query') %}
<iframe id="jobQueryFrame" data-src="/job-query" title="設備維修查詢"></iframe>
{% endif %}
{% if can_view_page('/query-tool') %}
<iframe id="queryToolFrame" data-src="/query-tool" title="批次追蹤工具"></iframe>
{% endif %}
</div>
</div>
{% endblock %}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,538 @@
# -*- coding: utf-8 -*-
"""Integration tests for Query Tool API routes.
Tests the API endpoints with mocked service dependencies:
- Input validation (empty, over limit, invalid format)
- Success responses
- Error handling
"""
import pytest
import json
from unittest.mock import patch, MagicMock
from mes_dashboard import create_app
@pytest.fixture
def app():
"""Create test Flask application."""
app = create_app()
app.config['TESTING'] = True
return app
@pytest.fixture
def client(app):
"""Create test client."""
return app.test_client()
class TestQueryToolPage:
"""Tests for /query-tool page route."""
def test_page_returns_html(self, client):
"""Should return the query tool page."""
response = client.get('/query-tool')
assert response.status_code == 200
assert b'html' in response.data.lower()
class TestResolveEndpoint:
"""Tests for /api/query-tool/resolve endpoint."""
def test_missing_input_type(self, client):
"""Should return error without input_type."""
response = client.post(
'/api/query-tool/resolve',
json={
'values': ['GA23100020-A00-001']
}
)
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
def test_missing_values(self, client):
"""Should return error without values."""
response = client.post(
'/api/query-tool/resolve',
json={
'input_type': 'lot_id'
}
)
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
def test_empty_values(self, client):
"""Should return error for empty values list."""
response = client.post(
'/api/query-tool/resolve',
json={
'input_type': 'lot_id',
'values': []
}
)
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
def test_values_over_limit(self, client):
"""Should reject values exceeding limit."""
# More than MAX_LOT_IDS (50)
values = [f'GA{i:09d}' for i in range(51)]
response = client.post(
'/api/query-tool/resolve',
json={
'input_type': 'lot_id',
'values': values
}
)
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
assert '超過上限' in data['error'] or '50' in data['error']
@patch('mes_dashboard.routes.query_tool_routes.resolve_lots')
def test_resolve_success(self, mock_resolve, client):
"""Should return resolved LOT IDs on success."""
mock_resolve.return_value = {
'data': [
{
'container_id': '488103800029578b',
'lot_id': 'GA23100020-A00-001',
'input_value': 'GA23100020-A00-001',
'spec_name': 'SPEC-001'
}
],
'total': 1,
'input_count': 1,
'not_found': []
}
response = client.post(
'/api/query-tool/resolve',
json={
'input_type': 'lot_id',
'values': ['GA23100020-A00-001']
}
)
assert response.status_code == 200
data = json.loads(response.data)
assert 'data' in data
assert data['total'] == 1
assert data['data'][0]['lot_id'] == 'GA23100020-A00-001'
@patch('mes_dashboard.routes.query_tool_routes.resolve_lots')
def test_resolve_not_found(self, mock_resolve, client):
"""Should return not_found list for missing LOT IDs."""
mock_resolve.return_value = {
'data': [],
'total': 0,
'input_count': 1,
'not_found': ['INVALID-LOT-ID']
}
response = client.post(
'/api/query-tool/resolve',
json={
'input_type': 'lot_id',
'values': ['INVALID-LOT-ID']
}
)
assert response.status_code == 200
data = json.loads(response.data)
assert data['total'] == 0
assert 'INVALID-LOT-ID' in data['not_found']
class TestLotHistoryEndpoint:
"""Tests for /api/query-tool/lot-history endpoint."""
def test_missing_container_id(self, client):
"""Should return error without container_id."""
response = client.get('/api/query-tool/lot-history')
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
@patch('mes_dashboard.routes.query_tool_routes.get_lot_history')
def test_lot_history_success(self, mock_query, client):
"""Should return lot history on success."""
mock_query.return_value = {
'data': [
{
'CONTAINERID': '488103800029578b',
'EQUIPMENTNAME': 'ASSY-01',
'SPECNAME': 'SPEC-001',
'TRACKINTIMESTAMP': '2024-01-15 10:30:00',
'TRACKOUTTIMESTAMP': '2024-01-15 11:00:00'
}
],
'total': 1
}
response = client.get('/api/query-tool/lot-history?container_id=488103800029578b')
assert response.status_code == 200
data = json.loads(response.data)
assert 'data' in data
assert data['total'] == 1
@patch('mes_dashboard.routes.query_tool_routes.get_lot_history')
def test_lot_history_service_error(self, mock_query, client):
"""Should return error from service."""
mock_query.return_value = {'error': '查詢失敗'}
response = client.get('/api/query-tool/lot-history?container_id=invalid')
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
class TestAdjacentLotsEndpoint:
"""Tests for /api/query-tool/adjacent-lots endpoint."""
def test_missing_equipment_id(self, client):
"""Should return error without equipment_id."""
response = client.get(
'/api/query-tool/adjacent-lots?'
'spec_name=SPEC-001&trackin_time=2024-01-15T10:30:00'
)
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
def test_missing_spec_name(self, client):
"""Should return error without spec_name."""
response = client.get(
'/api/query-tool/adjacent-lots?'
'equipment_id=EQ001&trackin_time=2024-01-15T10:30:00'
)
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
def test_missing_target_time(self, client):
"""Should return error without target_time."""
response = client.get(
'/api/query-tool/adjacent-lots?'
'equipment_id=EQ001&spec_name=SPEC-001'
)
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
@patch('mes_dashboard.routes.query_tool_routes.get_adjacent_lots')
def test_adjacent_lots_success(self, mock_query, client):
"""Should return adjacent lots on success."""
mock_query.return_value = {
'data': [
{
'CONTAINERID': '488103800029578a',
'CONTAINERNAME': 'GA23100020-A00-000',
'relative_position': -1
},
{
'CONTAINERID': '488103800029578b',
'CONTAINERNAME': 'GA23100020-A00-001',
'relative_position': 0
},
{
'CONTAINERID': '488103800029578c',
'CONTAINERNAME': 'GA23100020-A00-002',
'relative_position': 1
}
],
'total': 3
}
response = client.get(
'/api/query-tool/adjacent-lots?'
'equipment_id=EQ001&spec_name=SPEC-001&target_time=2024-01-15T10:30:00'
)
assert response.status_code == 200
data = json.loads(response.data)
assert 'data' in data
assert data['total'] == 3
class TestLotAssociationsEndpoint:
"""Tests for /api/query-tool/lot-associations endpoint."""
def test_missing_container_id(self, client):
"""Should return error without container_id."""
response = client.get('/api/query-tool/lot-associations?type=materials')
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
def test_missing_type(self, client):
"""Should return error without type."""
response = client.get('/api/query-tool/lot-associations?container_id=488103800029578b')
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
def test_invalid_type(self, client):
"""Should return error for invalid association type."""
response = client.get(
'/api/query-tool/lot-associations?container_id=488103800029578b&type=invalid'
)
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
assert '不支援' in data['error'] or 'type' in data['error'].lower()
@patch('mes_dashboard.routes.query_tool_routes.get_lot_materials')
def test_lot_materials_success(self, mock_query, client):
"""Should return lot materials on success."""
mock_query.return_value = {
'data': [
{
'MATERIALTYPE': 'TypeA',
'MATERIALNAME': 'Material-001',
'QTY': 100
}
],
'total': 1
}
response = client.get(
'/api/query-tool/lot-associations?container_id=488103800029578b&type=materials'
)
assert response.status_code == 200
data = json.loads(response.data)
assert 'data' in data
assert data['total'] == 1
class TestEquipmentPeriodEndpoint:
"""Tests for /api/query-tool/equipment-period endpoint."""
def test_missing_query_type(self, client):
"""Should return error without query_type."""
response = client.post(
'/api/query-tool/equipment-period',
json={
'equipment_ids': ['EQ001'],
'start_date': '2024-01-01',
'end_date': '2024-01-31'
}
)
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
assert '查詢類型' in data['error'] or 'type' in data['error'].lower()
def test_empty_equipment_ids(self, client):
"""Should return error for empty equipment_ids."""
response = client.post(
'/api/query-tool/equipment-period',
json={
'equipment_ids': [],
'start_date': '2024-01-01',
'end_date': '2024-01-31',
'query_type': 'status_hours'
}
)
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
def test_missing_start_date(self, client):
"""Should return error without start_date."""
response = client.post(
'/api/query-tool/equipment-period',
json={
'equipment_ids': ['EQ001'],
'end_date': '2024-01-31',
'query_type': 'status_hours'
}
)
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
def test_missing_end_date(self, client):
"""Should return error without end_date."""
response = client.post(
'/api/query-tool/equipment-period',
json={
'equipment_ids': ['EQ001'],
'start_date': '2024-01-01',
'query_type': 'status_hours'
}
)
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
def test_invalid_date_range(self, client):
"""Should return error for end date before start date."""
response = client.post(
'/api/query-tool/equipment-period',
json={
'equipment_ids': ['EQ001'],
'start_date': '2024-12-31',
'end_date': '2024-01-01',
'query_type': 'status_hours'
}
)
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
assert '結束日期' in data['error'] or '早於' in data['error']
def test_date_range_exceeds_limit(self, client):
"""Should reject date range > 90 days."""
response = client.post(
'/api/query-tool/equipment-period',
json={
'equipment_ids': ['EQ001'],
'start_date': '2024-01-01',
'end_date': '2024-06-01',
'query_type': 'status_hours'
}
)
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
assert '90' in data['error']
def test_invalid_query_type(self, client):
"""Should reject invalid query_type."""
response = client.post(
'/api/query-tool/equipment-period',
json={
'equipment_ids': ['EQ001'],
'start_date': '2024-01-01',
'end_date': '2024-01-31',
'query_type': 'invalid_type'
}
)
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
assert '查詢類型' in data['error'] or 'type' in data['error'].lower()
@patch('mes_dashboard.routes.query_tool_routes.get_equipment_status_hours')
def test_equipment_status_hours_success(self, mock_status, client):
"""Should return equipment status hours on success."""
mock_status.return_value = {'data': [], 'total': 0}
response = client.post(
'/api/query-tool/equipment-period',
json={
'equipment_ids': ['EQ001'],
'start_date': '2024-01-01',
'end_date': '2024-01-31',
'query_type': 'status_hours'
}
)
assert response.status_code == 200
data = json.loads(response.data)
assert 'data' in data
class TestExportCsvEndpoint:
"""Tests for /api/query-tool/export-csv endpoint."""
def test_missing_export_type(self, client):
"""Should return error without export_type."""
response = client.post(
'/api/query-tool/export-csv',
json={
'params': {'container_id': '488103800029578b'}
}
)
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
def test_invalid_export_type(self, client):
"""Should return error for invalid export_type."""
response = client.post(
'/api/query-tool/export-csv',
json={
'export_type': 'invalid_type',
'params': {}
}
)
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
assert '不支援' in data['error'] or 'type' in data['error'].lower()
@patch('mes_dashboard.routes.query_tool_routes.get_lot_history')
def test_export_lot_history_success(self, mock_get_history, client):
"""Should return CSV for lot history."""
mock_get_history.return_value = {
'data': [
{
'EQUIPMENTNAME': 'ASSY-01',
'SPECNAME': 'SPEC-001',
'TRACKINTIMESTAMP': '2024-01-15 10:00:00'
}
],
'total': 1
}
response = client.post(
'/api/query-tool/export-csv',
json={
'export_type': 'lot_history',
'params': {'container_id': '488103800029578b'}
}
)
assert response.status_code == 200
assert 'text/csv' in response.content_type
class TestEquipmentListEndpoint:
"""Tests for /api/query-tool/equipment-list endpoint."""
@patch('mes_dashboard.services.resource_cache.get_all_resources')
def test_get_equipment_list_success(self, mock_get_resources, client):
"""Should return equipment list."""
mock_get_resources.return_value = [
{
'RESOURCEID': 'EQ001',
'RESOURCENAME': 'ASSY-01',
'WORKCENTERNAME': 'WC-A',
'RESOURCEFAMILYNAME': 'FAM-01'
},
{
'RESOURCEID': 'EQ002',
'RESOURCENAME': 'ASSY-02',
'WORKCENTERNAME': 'WC-B',
'RESOURCEFAMILYNAME': 'FAM-02'
}
]
response = client.get('/api/query-tool/equipment-list')
assert response.status_code == 200
data = json.loads(response.data)
assert 'data' in data
assert 'total' in data
assert data['total'] == 2
@patch('mes_dashboard.services.resource_cache.get_all_resources')
def test_get_equipment_list_empty(self, mock_get_resources, client):
"""Should return error when no equipment available."""
mock_get_resources.return_value = []
response = client.get('/api/query-tool/equipment-list')
assert response.status_code == 500
data = json.loads(response.data)
assert 'error' in data
@patch('mes_dashboard.services.resource_cache.get_all_resources')
def test_get_equipment_list_exception(self, mock_get_resources, client):
"""Should handle exception gracefully."""
mock_get_resources.side_effect = Exception('Database error')
response = client.get('/api/query-tool/equipment-list')
assert response.status_code == 500
data = json.loads(response.data)
assert 'error' in data

View File

@@ -0,0 +1,294 @@
# -*- coding: utf-8 -*-
"""Unit tests for Query Tool service functions.
Tests the core service functions without database dependencies:
- Input validation (LOT, equipment, date range)
- IN clause building helpers
- Constants validation
"""
import pytest
from mes_dashboard.services.query_tool_service import (
validate_date_range,
validate_lot_input,
validate_equipment_input,
_build_in_clause,
_build_in_filter,
BATCH_SIZE,
MAX_LOT_IDS,
MAX_SERIAL_NUMBERS,
MAX_WORK_ORDERS,
MAX_EQUIPMENTS,
MAX_DATE_RANGE_DAYS,
)
class TestValidateDateRange:
"""Tests for validate_date_range function."""
def test_valid_range(self):
"""Should return None for valid date range."""
result = validate_date_range('2024-01-01', '2024-01-31')
assert result is None
def test_same_day(self):
"""Should allow same day as start and end."""
result = validate_date_range('2024-01-01', '2024-01-01')
assert result is None
def test_end_before_start(self):
"""Should reject end date before start date."""
result = validate_date_range('2024-12-31', '2024-01-01')
assert result is not None
assert '結束日期' in result or '早於' in result
def test_exceeds_max_range(self):
"""Should reject date range exceeding limit."""
result = validate_date_range('2023-01-01', '2024-12-31')
assert result is not None
assert str(MAX_DATE_RANGE_DAYS) in result
def test_exactly_max_range(self):
"""Should allow exactly max range days."""
# 90 days from 2024-01-01 is 2024-03-31
result = validate_date_range('2024-01-01', '2024-03-31')
assert result is None
def test_one_day_over_max_range(self):
"""Should reject one day over max range."""
# 91 days
result = validate_date_range('2024-01-01', '2024-04-02')
assert result is not None
assert str(MAX_DATE_RANGE_DAYS) in result
def test_invalid_date_format(self):
"""Should reject invalid date format."""
result = validate_date_range('01-01-2024', '12-31-2024')
assert result is not None
assert '格式' in result or 'format' in result.lower()
def test_invalid_start_date(self):
"""Should reject invalid start date."""
result = validate_date_range('2024-13-01', '2024-12-31')
assert result is not None
assert '格式' in result or 'format' in result.lower()
def test_invalid_end_date(self):
"""Should reject invalid end date."""
result = validate_date_range('2024-01-01', '2024-02-30')
assert result is not None
assert '格式' in result or 'format' in result.lower()
def test_non_date_string(self):
"""Should reject non-date strings."""
result = validate_date_range('abc', 'def')
assert result is not None
assert '格式' in result or 'format' in result.lower()
class TestValidateLotInput:
"""Tests for validate_lot_input function."""
def test_valid_lot_ids(self):
"""Should accept valid LOT IDs within limit."""
values = ['GA23100020-A00-001', 'GA23100020-A00-002']
result = validate_lot_input('lot_id', values)
assert result is None
def test_valid_serial_numbers(self):
"""Should accept valid serial numbers within limit."""
values = ['SN001', 'SN002', 'SN003']
result = validate_lot_input('serial_number', values)
assert result is None
def test_valid_work_orders(self):
"""Should accept valid work orders within limit."""
values = ['GA231000001']
result = validate_lot_input('work_order', values)
assert result is None
def test_empty_values(self):
"""Should reject empty values list."""
result = validate_lot_input('lot_id', [])
assert result is not None
assert '至少一個' in result
def test_exceeds_lot_id_limit(self):
"""Should reject LOT IDs exceeding limit."""
values = [f'GA{i:09d}' for i in range(MAX_LOT_IDS + 1)]
result = validate_lot_input('lot_id', values)
assert result is not None
assert '超過上限' in result
assert str(MAX_LOT_IDS) in result
def test_exceeds_serial_number_limit(self):
"""Should reject serial numbers exceeding limit."""
values = [f'SN{i:06d}' for i in range(MAX_SERIAL_NUMBERS + 1)]
result = validate_lot_input('serial_number', values)
assert result is not None
assert '超過上限' in result
assert str(MAX_SERIAL_NUMBERS) in result
def test_exceeds_work_order_limit(self):
"""Should reject work orders exceeding limit."""
values = [f'WO{i:06d}' for i in range(MAX_WORK_ORDERS + 1)]
result = validate_lot_input('work_order', values)
assert result is not None
assert '超過上限' in result
assert str(MAX_WORK_ORDERS) in result
def test_exactly_at_limit(self):
"""Should accept values exactly at limit."""
values = [f'GA{i:09d}' for i in range(MAX_LOT_IDS)]
result = validate_lot_input('lot_id', values)
assert result is None
def test_unknown_input_type_uses_default_limit(self):
"""Should use default limit for unknown input types."""
values = [f'X{i}' for i in range(MAX_LOT_IDS)]
result = validate_lot_input('unknown_type', values)
assert result is None
values_over = [f'X{i}' for i in range(MAX_LOT_IDS + 1)]
result = validate_lot_input('unknown_type', values_over)
assert result is not None
class TestValidateEquipmentInput:
"""Tests for validate_equipment_input function."""
def test_valid_equipment_ids(self):
"""Should accept valid equipment IDs within limit."""
values = ['EQ001', 'EQ002', 'EQ003']
result = validate_equipment_input(values)
assert result is None
def test_empty_equipment_ids(self):
"""Should reject empty equipment list."""
result = validate_equipment_input([])
assert result is not None
assert '至少一台' in result
def test_exceeds_equipment_limit(self):
"""Should reject equipment IDs exceeding limit."""
values = [f'EQ{i:05d}' for i in range(MAX_EQUIPMENTS + 1)]
result = validate_equipment_input(values)
assert result is not None
assert '不得超過' in result
assert str(MAX_EQUIPMENTS) in result
def test_exactly_at_limit(self):
"""Should accept equipment IDs exactly at limit."""
values = [f'EQ{i:05d}' for i in range(MAX_EQUIPMENTS)]
result = validate_equipment_input(values)
assert result is None
class TestBuildInClause:
"""Tests for _build_in_clause function."""
def test_empty_list(self):
"""Should return empty list for empty input."""
result = _build_in_clause([])
assert result == []
def test_single_value(self):
"""Should return single chunk for single value."""
result = _build_in_clause(['VAL001'])
assert len(result) == 1
assert result[0] == "'VAL001'"
def test_multiple_values(self):
"""Should join multiple values with comma."""
result = _build_in_clause(['VAL001', 'VAL002', 'VAL003'])
assert len(result) == 1
assert "'VAL001'" in result[0]
assert "'VAL002'" in result[0]
assert "'VAL003'" in result[0]
assert result[0] == "'VAL001', 'VAL002', 'VAL003'"
def test_chunking(self):
"""Should chunk when exceeding batch size."""
# Create more than BATCH_SIZE values
values = [f'VAL{i:06d}' for i in range(BATCH_SIZE + 10)]
result = _build_in_clause(values)
assert len(result) == 2
# First chunk should have BATCH_SIZE items
assert result[0].count("'") == BATCH_SIZE * 2 # 2 quotes per value
def test_escape_single_quotes(self):
"""Should escape single quotes in values."""
result = _build_in_clause(["VAL'001"])
assert len(result) == 1
assert "VAL''001" in result[0] # Escaped
def test_custom_chunk_size(self):
"""Should respect custom chunk size."""
values = ['V1', 'V2', 'V3', 'V4', 'V5']
result = _build_in_clause(values, max_chunk_size=2)
assert len(result) == 3 # 2+2+1
class TestBuildInFilter:
"""Tests for _build_in_filter function."""
def test_empty_list(self):
"""Should return 1=0 for empty input (no results)."""
result = _build_in_filter([], 'COL')
assert result == "1=0"
def test_single_value(self):
"""Should build simple IN clause for single value."""
result = _build_in_filter(['VAL001'], 'COL')
assert "COL IN" in result
assert "'VAL001'" in result
def test_multiple_values(self):
"""Should build IN clause with multiple values."""
result = _build_in_filter(['VAL001', 'VAL002'], 'COL')
assert "COL IN" in result
assert "'VAL001'" in result
assert "'VAL002'" in result
def test_custom_column(self):
"""Should use custom column name."""
result = _build_in_filter(['VAL001'], 't.MYCOL')
assert "t.MYCOL IN" in result
def test_large_list_uses_or(self):
"""Should use OR for chunked results."""
# Create more than BATCH_SIZE values
values = [f'VAL{i:06d}' for i in range(BATCH_SIZE + 10)]
result = _build_in_filter(values, 'COL')
assert " OR " in result
# Should have parentheses wrapping the OR conditions
assert result.startswith("(")
assert result.endswith(")")
class TestServiceConstants:
"""Tests for service constants."""
def test_batch_size_is_reasonable(self):
"""Batch size should be <= 1000 (Oracle limit)."""
assert BATCH_SIZE <= 1000
def test_max_date_range_is_reasonable(self):
"""Max date range should be 90 days."""
assert MAX_DATE_RANGE_DAYS == 90
def test_max_lot_ids_is_reasonable(self):
"""Max LOT IDs should be sensible."""
assert 10 <= MAX_LOT_IDS <= 100
def test_max_serial_numbers_is_reasonable(self):
"""Max serial numbers should be sensible."""
assert 10 <= MAX_SERIAL_NUMBERS <= 100
def test_max_work_orders_is_reasonable(self):
"""Max work orders should be low due to expansion."""
assert MAX_WORK_ORDERS <= 20 # Work orders can expand to many LOTs
def test_max_equipments_is_reasonable(self):
"""Max equipments should be sensible."""
assert 5 <= MAX_EQUIPMENTS <= 50