Files
SalesPipeline/backend/app/services/excel_parser.py
2026-01-27 19:08:46 +08:00

206 lines
8.8 KiB
Python

import re
import uuid
import math
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple
import pandas as pd
import chardet
from openpyxl import load_workbook
from app.config import MAX_HEADER_SCAN_ROWS, UPLOAD_DIR
def clean_value(val):
"""清理單一值,將 NaN/Inf 轉換為 None 以便 JSON 序列化"""
if val is None:
return None
if isinstance(val, float):
if math.isnan(val) or math.isinf(val):
return None
if isinstance(val, str):
val = val.lstrip("'") # Remove leading apostrophe often added by Excel
return val
def clean_dict(d: Dict) -> Dict:
"""清理字典中的所有 NaN/Inf 值"""
return {k: clean_value(v) for k, v in d.items()}
def clean_records(records: List[Dict]) -> List[Dict]:
"""清理記錄列表中的所有 NaN/Inf 值"""
return [clean_dict(r) for r in records]
# 欄位名稱對應表
COLUMN_MAPPING = {
'dit': {
'op_id': ['opportunity no', 'opportunity', 'op編號', 'op 編號', 'op_id', 'opid', '案件編號', '案號', 'opportunity id'],
'op_name': ['opportunity name', '專案名稱', '案件名稱'],
'erp_account': ['erp account', 'account no', 'erp account no', '客戶代碼', '客戶編號', 'erp_account'],
'customer': ['account name', 'branding customer', '客戶', '客戶名稱', 'customer', 'customer name', '公司名稱'],
'pn': ['product name', '料號', 'part number', 'pn', 'part no', 'part_number', '產品料號', 'stage/part'],
'eau': ['eau quantity', 'eau quantity (pcs)', 'eau', '年預估量', 'annual usage', '預估用量'],
'stage': ['stage', 'oppty product stage', '階段', 'status', '狀態', '專案階段'],
'date': ['created date', '日期', 'date', '建立日期', 'create date']
},
'sample': {
'sample_id': ['sample_id', 'sample id', '樣品ID'],
'order_no': ['樣品訂單號碼', '單號', 'order_no', 'order no', '樣品單號', '申請單號', '樣品訂單號'],
'oppy_no': ['oppy no', 'oppy_no', '案號', '案件編號', 'opportunity no'],
'cust_id': ['cust id', 'cust_id', '客戶編號', '客戶代碼', '客戶代號'],
'customer': ['客戶名稱', '客戶簡稱', '客戶', 'customer', 'customer name'],
'pn': ['強茂料號', 'item', '料號', 'part number', 'pn', 'part no', '產品料號', 'type'],
'qty': ['索樣數量pcs', '索樣數量 pcs', '索樣數量 k', '數量', 'qty', 'quantity', '申請數量', '索樣數量'],
'date': ['出貨日', '需求日', '日期', 'date', '申請日期']
},
'order': {
'order_id': ['項次', '訂單項次', '訂單編號', 'order_id', 'order id'],
'order_no': ['訂單單號', '訂單號碼', '訂單號', 'order_no', 'order no', '銷貨單號'],
'cust_id': ['客戶編號', '客戶代碼', '客戶代號', '客戶', 'cust_id', 'cust id', 'erp code', 'erp_code', 'erpcode', 'erp'],
'customer': ['客戶', '客戶名稱', 'customer', 'customer name'],
'pn': ['強茂料號', '內部料號', '料號', 'part number', 'pn', 'part no', '產品料號', 'type'],
'qty': ['訂單需求量', '訂單量', '數量', 'qty', 'quantity', '訂購數量', '出貨數量'],
'status': ['明細行狀態', '狀態', 'status', '訂單狀態'],
'amount': ['台幣金額', '原幣金額(含稅)', '台幣金額(未稅)', '金額', 'amount', 'total', '訂單金額'],
'date': ['訂單日期', '日期', 'date', 'order date', 'order_date']
}
}
class ExcelParser:
def __init__(self):
self.parsed_files: Dict[str, Dict] = {}
def detect_encoding(self, file_path: Path) -> str:
"""偵測檔案編碼"""
with open(file_path, 'rb') as f:
result = chardet.detect(f.read(10000))
return result.get('encoding', 'utf-8')
def find_header_row(self, df: pd.DataFrame, file_type: str) -> int:
"""自動偵測表頭位置"""
expected_columns = set()
for variants in COLUMN_MAPPING[file_type].values():
expected_columns.update([v.lower() for v in variants])
for idx in range(min(MAX_HEADER_SCAN_ROWS, len(df))):
row = df.iloc[idx]
row_values = [str(v).lower().strip() for v in row.values if pd.notna(v)]
# 檢查是否有匹配的欄位名稱
matches = sum(1 for v in row_values if any(exp in v for exp in expected_columns))
if matches >= 2: # 至少匹配 2 個欄位
return idx
return 0 # 預設第一行為表頭
def map_columns(self, df: pd.DataFrame, file_type: str) -> Dict[str, str]:
"""將 DataFrame 欄位對應到標準欄位名稱"""
mapping = {}
column_map = COLUMN_MAPPING[file_type]
df_columns = [str(c).lower().strip() for c in df.columns]
# 第一階段:嘗試精確比對 (Case-insensitive)
for standard_name, variants in column_map.items():
variants_lower = [v.lower().strip() for v in variants]
for idx, col in enumerate(df_columns):
if col in variants_lower:
mapping[df.columns[idx]] = standard_name
print(f"[DEBUG] Exact Mapped '{df.columns[idx]}' to '{standard_name}'")
break
# 第二階段:嘗試子字串包含比對 (僅針對未對應到的欄位)
mapped_indices = set()
for col_name in mapping.keys():
for idx, col in enumerate(df.columns):
if col == col_name:
mapped_indices.add(idx)
for standard_name, variants in column_map.items():
if standard_name in mapping.values():
continue
for variant in variants:
variant_lower = variant.lower().strip()
if len(variant_lower) < 2: continue # 避免過短的關鍵字誤判
for idx, col in enumerate(df_columns):
if idx in mapped_indices: continue
if variant_lower in col or col in variant_lower:
mapping[df.columns[idx]] = standard_name
mapped_indices.add(idx)
print(f"[DEBUG] Substring Mapped '{df.columns[idx]}' to '{standard_name}' (matched '{variant}')")
break
if standard_name in mapping.values():
break
print(f"[DEBUG] Final Mapping for {file_type}: {mapping}")
return mapping
def parse_file(self, file_path: Path, file_type: str) -> Tuple[str, Dict[str, Any]]:
"""解析 Excel/CSV 檔案"""
file_id = str(uuid.uuid4())
# 讀取檔案
if file_path.suffix.lower() == '.csv':
encoding = self.detect_encoding(file_path)
df = pd.read_csv(file_path, encoding=encoding, header=None)
else:
df = pd.read_excel(file_path, header=None)
# 找到表頭
header_row = self.find_header_row(df, file_type)
# 重新讀取,以正確的表頭
if file_path.suffix.lower() == '.csv':
df = pd.read_csv(file_path, encoding=encoding, header=header_row)
else:
df = pd.read_excel(file_path, header=header_row)
# 欄位對應
column_mapping = self.map_columns(df, file_type)
df = df.rename(columns=column_mapping)
# 只保留需要的欄位
required_columns = list(COLUMN_MAPPING[file_type].keys())
available_columns = [c for c in required_columns if c in df.columns]
df = df[available_columns]
# 清理資料
df = df.dropna(how='all')
# 產生預覽資料(清理 NaN 值以便 JSON 序列化)
preview = clean_records(df.head(10).to_dict(orient='records'))
# 儲存解析結果
parsed_data = {
'file_id': file_id,
'file_type': file_type,
'filename': file_path.name,
'header_row': header_row,
'row_count': len(df),
'columns': list(df.columns),
'preview': preview,
'dataframe': df
}
self.parsed_files[file_id] = parsed_data
return file_id, {k: v for k, v in parsed_data.items() if k != 'dataframe'}
def get_parsed_data(self, file_id: str) -> Optional[pd.DataFrame]:
"""取得解析後的 DataFrame"""
if file_id in self.parsed_files:
return self.parsed_files[file_id].get('dataframe')
return None
def get_file_info(self, file_id: str) -> Optional[Dict]:
"""取得檔案資訊"""
if file_id in self.parsed_files:
data = self.parsed_files[file_id]
return {k: v for k, v in data.items() if k != 'dataframe'}
return None
# 全域實例
excel_parser = ExcelParser()