feat: 重構 WIP 報表為即時 WIP 分布表

主要變更:
- 完全重寫 wip_report.html 為 Pivot 分布表格式
- 新增 3 個 WIP Distribution API (filter_options, pivot_columns, distribution)
- 橫向展開 Workcenter + Spec 組合,只顯示有資料的欄位
- 套用 WORKCENTER_GROUPS 合併邏輯 (切割→焊接→成型等12組)
- 新增 Lot Status 篩選 (Active/Hold,基於 HOLDREASONNAME)
- 排除 DUMMY 工單
- 摘要卡片顯示: 總LOT數、總數量、HOLD LOT數、HOLD數量

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
ymirliu
2026-01-19 12:01:00 +08:00
parent 0c2c253911
commit ad0cdb2a5c
3 changed files with 1863 additions and 1692 deletions

View File

@@ -322,6 +322,13 @@ def get_current_wip_subquery(days_back=90):
Uses ROW_NUMBER() analytic function for better performance.
Only scans recent data (default 90 days) to reduce scan size.
Filters out completed (8) and scrapped (128) status.
Excludes DUMMY orders (MFGORDERNAME = 'DUMMY').
Logic explanation:
- PARTITION BY CONTAINERNAME: Groups records by each LOT
- ORDER BY TXNDATE DESC: Orders by transaction time (newest first)
- rn = 1: Takes only the latest record for each LOT
- This gives us the current/latest status of each LOT
"""
return f"""
SELECT *
@@ -331,6 +338,7 @@ def get_current_wip_subquery(days_back=90):
FROM DW_MES_WIP w
WHERE w.TXNDATE >= SYSDATE - {days_back}
AND w.STATUS NOT IN (8, 128)
AND (w.MFGORDERNAME IS NULL OR w.MFGORDERNAME <> 'DUMMY')
)
WHERE rn = 1
"""
@@ -1916,6 +1924,272 @@ def api_dashboard_utilization_heatmap():
return jsonify({'success': False, 'error': '查詢失敗'}), 500
# ============================================================
# WIP Distribution Table API (即時 WIP 分布表)
# ============================================================
def query_wip_distribution_filter_options(days_back=90):
"""取得 WIP 分布表的篩選選項: packages(PRODUCTLINENAME_LEF), types(PJ_TYPE), areas(PJ_PRODUCEREGION), lot_statuses"""
try:
base_sql = get_current_wip_subquery(days_back)
sql = f"""
SELECT
PRODUCTLINENAME_LEF,
PJ_TYPE,
PJ_PRODUCEREGION,
HOLDREASONNAME
FROM ({base_sql}) wip
"""
df = read_sql_df(sql)
# 提取不重複值並排序
packages = sorted([x for x in df['PRODUCTLINENAME_LEF'].dropna().unique().tolist() if x])
types = sorted([x for x in df['PJ_TYPE'].dropna().unique().tolist() if x])
areas = sorted([x for x in df['PJ_PRODUCEREGION'].dropna().unique().tolist() if x])
# Lot 狀態: 根據 HOLDREASONNAME 判斷 - 有值=Hold, 無值=Active
lot_statuses = ['Active', 'Hold']
return {
'packages': packages,
'types': types,
'areas': areas,
'lot_statuses': lot_statuses
}
except Exception as exc:
print(f"WIP 篩選選項查詢失敗: {exc}")
import traceback
traceback.print_exc()
return None
def query_wip_distribution_pivot_columns(filters=None, days_back=90):
"""取得 WIP 分布表的 Pivot 欄位列表 (只回傳有資料的 Workcenter|Spec 組合)"""
try:
base_sql = get_current_wip_subquery(days_back)
# 建立篩選條件
where_conditions = []
if filters:
if filters.get('packages') and len(filters['packages']) > 0:
pkg_list = "', '".join(filters['packages'])
where_conditions.append(f"PRODUCTLINENAME_LEF IN ('{pkg_list}')")
if filters.get('types') and len(filters['types']) > 0:
type_list = "', '".join(filters['types'])
where_conditions.append(f"PJ_TYPE IN ('{type_list}')")
if filters.get('areas') and len(filters['areas']) > 0:
area_list = "', '".join(filters['areas'])
where_conditions.append(f"PJ_PRODUCEREGION IN ('{area_list}')")
# Lot 狀態篩選: Active = HOLDREASONNAME IS NULL, Hold = HOLDREASONNAME IS NOT NULL
if filters.get('lot_statuses') and len(filters['lot_statuses']) > 0:
status_conds = []
if 'Active' in filters['lot_statuses']:
status_conds.append("HOLDREASONNAME IS NULL")
if 'Hold' in filters['lot_statuses']:
status_conds.append("HOLDREASONNAME IS NOT NULL")
if status_conds:
where_conditions.append(f"({' OR '.join(status_conds)})")
where_clause = " AND ".join(where_conditions) if where_conditions else "1=1"
sql = f"""
SELECT
WORKCENTERNAME,
SPECNAME as WC_SPEC,
COUNT(DISTINCT CONTAINERNAME) as LOT_COUNT
FROM ({base_sql}) wip
WHERE WORKCENTERNAME IS NOT NULL
AND {where_clause}
GROUP BY WORKCENTERNAME, SPECNAME
ORDER BY LOT_COUNT DESC
"""
df = read_sql_df(sql)
# 轉換為 pivot 欄位列表,並套用 WORKCENTER_GROUPS 分組邏輯
pivot_columns = []
for _, row in df.iterrows():
wc = row['WORKCENTERNAME'] or ''
spec = row['WC_SPEC'] or ''
# 使用 get_workcenter_group 取得合併後的群組名稱和排序
group_name, order = get_workcenter_group(wc)
display_wc = group_name if group_name else wc # 合併後的顯示名稱
key = f"{wc}|{spec}" # key 仍使用原始 workcenter 以便匹配
pivot_columns.append({
'key': key,
'workcenter': wc, # 原始 workcenter
'workcenter_group': display_wc, # 合併後的群組名稱
'order': order, # 排序順序
'spec': spec,
'count': int(row['LOT_COUNT'] or 0)
})
return pivot_columns
except Exception as exc:
print(f"WIP Pivot 欄位查詢失敗: {exc}")
import traceback
traceback.print_exc()
return None
def query_wip_distribution(filters=None, limit=500, offset=0, days_back=90):
"""查詢 WIP 分布表主數據,回傳每個 Lot 的基本資訊及其所在的 Workcenter|Spec"""
try:
base_sql = get_current_wip_subquery(days_back)
# 建立篩選條件
where_conditions = []
if filters:
if filters.get('packages') and len(filters['packages']) > 0:
pkg_list = "', '".join(filters['packages'])
where_conditions.append(f"PRODUCTLINENAME_LEF IN ('{pkg_list}')")
if filters.get('types') and len(filters['types']) > 0:
type_list = "', '".join(filters['types'])
where_conditions.append(f"PJ_TYPE IN ('{type_list}')")
if filters.get('areas') and len(filters['areas']) > 0:
area_list = "', '".join(filters['areas'])
where_conditions.append(f"PJ_PRODUCEREGION IN ('{area_list}')")
# Lot 狀態篩選: Active = HOLDREASONNAME IS NULL, Hold = HOLDREASONNAME IS NOT NULL
if filters.get('lot_statuses') and len(filters['lot_statuses']) > 0:
status_conds = []
if 'Active' in filters['lot_statuses']:
status_conds.append("HOLDREASONNAME IS NULL")
if 'Hold' in filters['lot_statuses']:
status_conds.append("HOLDREASONNAME IS NOT NULL")
if status_conds:
where_conditions.append(f"({' OR '.join(status_conds)})")
if filters.get('search'):
search_term = filters['search'].replace("'", "''")
where_conditions.append(
f"(UPPER(MFGORDERNAME) LIKE UPPER('%{search_term}%') "
f"OR UPPER(CONTAINERNAME) LIKE UPPER('%{search_term}%'))"
)
where_clause = " AND ".join(where_conditions) if where_conditions else "1=1"
# 先查詢總筆數
count_sql = f"""
SELECT COUNT(DISTINCT CONTAINERNAME) as TOTAL_COUNT
FROM ({base_sql}) wip
WHERE {where_clause}
"""
count_df = read_sql_df(count_sql)
total_count = int(count_df['TOTAL_COUNT'].iloc[0]) if len(count_df) > 0 else 0
# 分頁查詢主數據
start_row = offset + 1
end_row = offset + limit
sql = f"""
SELECT * FROM (
SELECT
MFGORDERNAME,
CONTAINERNAME,
SPECNAME,
PRODUCTLINENAME_LEF,
WAFERLOT,
PJ_TYPE,
PJ_PRODUCEREGION,
EQUIPMENTS,
WORKCENTERNAME,
STATUS,
HOLDREASONNAME,
QTY,
QTY2,
TXNDATE,
ROW_NUMBER() OVER (ORDER BY TXNDATE DESC, MFGORDERNAME, CONTAINERNAME) as rn
FROM ({base_sql}) wip
WHERE {where_clause}
) WHERE rn BETWEEN {start_row} AND {end_row}
"""
df = read_sql_df(sql)
# 轉換為回傳格式
rows = []
for _, row in df.iterrows():
wc = row['WORKCENTERNAME'] or ''
spec = row['SPECNAME'] or ''
pivot_key = f"{wc}|{spec}"
# Lot 狀態判斷: HOLDREASONNAME 有值=Hold, 無值=Active
hold_reason = row['HOLDREASONNAME']
lot_status = 'Hold' if (pd.notna(hold_reason) and hold_reason) else 'Active'
rows.append({
'MFGORDERNAME': row['MFGORDERNAME'],
'CONTAINERNAME': row['CONTAINERNAME'],
'SPECNAME': row['SPECNAME'],
'PRODUCTLINENAME_LEF': row['PRODUCTLINENAME_LEF'],
'WAFERLOT': row['WAFERLOT'],
'PJ_TYPE': row['PJ_TYPE'],
'PJ_PRODUCEREGION': row['PJ_PRODUCEREGION'],
'EQUIPMENTS': row['EQUIPMENTS'],
'WORKCENTERNAME': row['WORKCENTERNAME'],
'LOT_STATUS': lot_status,
'HOLDREASONNAME': hold_reason if pd.notna(hold_reason) else None,
'QTY': int(row['QTY']) if pd.notna(row['QTY']) else 0,
'QTY2': int(row['QTY2']) if pd.notna(row['QTY2']) else 0,
'pivot_key': pivot_key
})
return {
'rows': rows,
'total_count': total_count,
'offset': offset,
'limit': limit
}
except Exception as exc:
print(f"WIP 分布表查詢失敗: {exc}")
import traceback
traceback.print_exc()
return None
@app.route('/api/wip/distribution/filter_options')
def api_wip_distribution_filter_options():
"""API: 取得 WIP 分布表篩選選項"""
days_back = request.args.get('days_back', 90, type=int)
cache_key = make_cache_key("wip_dist_filter_options", days_back)
options = cache_get(cache_key)
if options is None:
options = query_wip_distribution_filter_options(days_back)
if options:
cache_set(cache_key, options, ttl=600) # 10 分鐘快取
if options:
return jsonify({'success': True, 'data': options})
return jsonify({'success': False, 'error': '查詢失敗'}), 500
@app.route('/api/wip/distribution/pivot_columns', methods=['POST'])
def api_wip_distribution_pivot_columns():
"""API: 取得 WIP 分布表 Pivot 欄位列表"""
data = request.get_json() or {}
filters = data.get('filters')
days_back = data.get('days_back', 90)
cache_key = make_cache_key("wip_dist_pivot_cols", days_back, filters)
columns = cache_get(cache_key)
if columns is None:
columns = query_wip_distribution_pivot_columns(filters, days_back)
if columns is not None:
cache_set(cache_key, columns, ttl=300) # 5 分鐘快取
if columns is not None:
return jsonify({'success': True, 'data': columns})
return jsonify({'success': False, 'error': '查詢失敗'}), 500
@app.route('/api/wip/distribution', methods=['POST'])
def api_wip_distribution():
"""API: 查詢 WIP 分布表主數據"""
data = request.get_json() or {}
filters = data.get('filters')
limit = min(data.get('limit', 500), 1000) # 最大 1000 筆
offset = data.get('offset', 0)
days_back = data.get('days_back', 90)
result = query_wip_distribution(filters, limit, offset, days_back)
if result is not None:
return jsonify({'success': True, 'data': result})
return jsonify({'success': False, 'error': '查詢失敗'}), 500
if __name__ == '__main__':
print("正在測試數據庫連接...")
conn = get_db_connection()

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff