Files
data_transform/transform_data.py
2025-08-07 15:40:21 +08:00

133 lines
5.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
import os
import numpy as np
def process_with_rounding(input_path, final_filename):
"""
讀取Excel執行分組聚合轉置計算統計數據(包含PPK)並四捨五入至小數點第三位,最後儲存。
"""
try:
print(f"正在讀取檔案: {input_path}")
df = pd.read_excel(input_path, sheet_name='Sheet1', engine='xlrd')
print("檔案讀取成功。")
# --- 1. 根據位置定義欄位 ---
id_cols_indices = [2, 6, 8, 9, 10] # C, G, I, J, K
id_vars_names = df.columns[id_cols_indices].tolist()
value_col_start_index = 19 # T欄位
value_vars = df.columns[value_col_start_index:].tolist()
special_cat_col_name = df.columns[6]
lsl_col_name = df.columns[9]
usl_col_name = df.columns[10]
# --- 2. 修正分組鍵,確保空值準確性 ---
def to_grouping_str(x):
if not pd.notna(x): return ''
if isinstance(x, (int, float)) and x == int(x): return str(int(x))
return str(x)
df[special_cat_col_name] = df[special_cat_col_name].apply(to_grouping_str)
df[lsl_col_name] = df[lsl_col_name].apply(to_grouping_str)
df[usl_col_name] = df[usl_col_name].apply(to_grouping_str)
print("已將分組鍵轉換為文字以進行精確分組。")
# --- 3. 執行分組與聚合 ---
def flatten_group_values(series):
return [item for item in series.values.flatten() if pd.notna(item)]
print("開始進行分組與數據合併...")
grouped = df.groupby(id_vars_names, dropna=False)[value_vars]
aggregated_series = grouped.apply(flatten_group_values)
if aggregated_series.empty or all(len(v) == 0 for v in aggregated_series):
print("警告:分組後未發現任何可合併的數據。")
return
# --- 4. 計算統計數據與PPK並進行四捨五入 ---
print("正在為每個分組計算統計數據與PPK...")
final_df = aggregated_series.reset_index(name='Aggregated_Values')
def calculate_and_round_stats(row):
values = pd.Series(row['Aggregated_Values'])
lsl_str = row[lsl_col_name]
usl_str = row[usl_col_name]
if values.empty:
return pd.Series([np.nan, np.nan, np.nan, np.nan, np.nan])
mean = values.mean()
std = values.std()
min_val = values.min()
max_val = values.max()
ppk = np.nan
if std is not None and std > 0:
try:
usl = float(usl_str)
has_usl = True
except (ValueError, TypeError): has_usl = False
try:
lsl = float(lsl_str)
has_lsl = True
except (ValueError, TypeError): has_lsl = False
if has_usl and has_lsl:
ppu = (usl - mean) / (3 * std)
ppl = (mean - lsl) / (3 * std)
ppk = min(ppu, ppl)
elif has_usl:
ppk = (usl - mean) / (3 * std)
elif has_lsl:
ppk = (mean - lsl) / (3 * std)
# *** 修改:對所有結果進行四捨五入到小數點後三位 ***
return pd.Series([
round(min_val, 3) if pd.notna(min_val) else min_val,
round(max_val, 3) if pd.notna(max_val) else max_val,
round(mean, 3) if pd.notna(mean) else mean,
round(std, 3) if pd.notna(std) else std,
round(ppk, 3) if pd.notna(ppk) else ppk
])
stats_df = final_df.apply(calculate_and_round_stats, axis=1)
stats_df.columns = ['最小值', '最大值', '平均值', '標準差', 'PPK']
stats_with_ids = pd.concat([final_df[id_vars_names], stats_df], axis=1)
stats_transposed = stats_with_ids.set_index(id_vars_names).T
print("統計數據計算與格式化完成。")
# --- 5. 準備並轉置主要數據 ---
print("正在準備與轉置主要數據...")
expanded_values = final_df['Aggregated_Values'].apply(pd.Series)
expanded_values.columns = [i + 1 for i in range(expanded_values.shape[1])]
result_df = pd.concat([final_df[id_vars_names], expanded_values], axis=1)
transposed_df = result_df.set_index(id_vars_names).T
transposed_df.index.name = "量測值編號"
print("主要數據轉置完成。")
# --- 6. 合併主要數據與統計數據 ---
final_result_df = pd.concat([transposed_df, stats_transposed])
print("已將統計結果附加到數據末尾。")
# --- 7. 儲存最終檔案 ---
output_dir = os.path.dirname(input_path)
final_output_path = os.path.join(output_dir, final_filename)
final_result_df.to_excel(final_output_path, index=True, engine='openpyxl')
print(f"成功!格式化後的最終檔案已儲存至: {final_output_path}")
except FileNotFoundError:
print(f"錯誤:找不到檔案 {input_path}")
except Exception as e:
print(f"處理過程中發生未預期的錯誤: {e}")
if __name__ == "__main__":
# 這個區塊現在僅供直接執行此腳本時測試用
# 當作為模組被 app.py 匯入時,此區塊不會被執行
print("此腳本現在是作為一個模組,請透過 app.py 啟動網頁服務來使用。")
# 以下是測試範例,您可以取消註解來進行單獨測試:
# input_file_path = r'GA25072023.xls' # 假設測試檔案在同個資料夾
# final_file_name = 'GA25072023_final_rounded_test.xlsx'
# process_with_rounding(input_file_path, final_file_name)