Files
SalesPipeline/backend/app/services/excel_parser.py
2026-01-16 18:16:33 +08:00

182 lines
7.6 KiB
Python

import re
import uuid
import math
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple
import pandas as pd
import chardet
from openpyxl import load_workbook
from app.config import MAX_HEADER_SCAN_ROWS, UPLOAD_DIR
def clean_value(val):
"""清理單一值,將 NaN/Inf 轉換為 None 以便 JSON 序列化"""
if val is None:
return None
if isinstance(val, float):
if math.isnan(val) or math.isinf(val):
return None
if isinstance(val, str):
val = val.lstrip("'") # Remove leading apostrophe often added by Excel
return val
def clean_dict(d: Dict) -> Dict:
"""清理字典中的所有 NaN/Inf 值"""
return {k: clean_value(v) for k, v in d.items()}
def clean_records(records: List[Dict]) -> List[Dict]:
"""清理記錄列表中的所有 NaN/Inf 值"""
return [clean_dict(r) for r in records]
# 欄位名稱對應表
COLUMN_MAPPING = {
'dit': {
'op_id': ['opportunity no', 'opportunity', 'op編號', 'op 編號', 'op_id', 'opid', '案件編號', '案號', 'opportunity id'],
'op_name': ['opportunity name', '專案名稱', '案件名稱'],
'erp_account': ['erp account', 'account no', 'erp account no', '客戶代碼', '客戶編號', 'erp_account'],
'customer': ['account name', 'branding customer', '客戶', '客戶名稱', 'customer', 'customer name', '公司名稱'],
'pn': ['product name', '料號', 'part number', 'pn', 'part no', 'part_number', '產品料號', 'stage/part'],
'eau': ['eau quantity', 'eau quantity (pcs)', 'eau', '年預估量', 'annual usage', '預估用量'],
'stage': ['stage', 'oppty product stage', '階段', 'status', '狀態', '專案階段'],
'date': ['created date', '日期', 'date', '建立日期', 'create date']
},
'sample': {
'sample_id': ['樣品訂單號碼', 'item', '樣品編號', 'sample_id', 'sample id', '編號'],
'order_no': ['樣品訂單號碼', '單號', 'order_no', 'order no', '樣品單號', '申請單號'],
'oppy_no': ['oppy no', 'oppy_no', '案號', '案件編號', 'opportunity no'],
'cust_id': ['cust id', 'cust_id', '客戶編號', '客戶代碼', '客戶代號'],
'customer': ['客戶名稱', '客戶簡稱', '客戶', 'customer', 'customer name'],
'pn': ['item', 'type', '料號', 'part number', 'pn', 'part no', '產品料號', '索樣數量'],
'qty': ['索樣數量pcs', '索樣數量 k', '數量', 'qty', 'quantity', '申請數量'],
'date': ['出貨日', '需求日', '日期', 'date', '申請日期']
},
'order': {
'order_id': ['項次', '訂單編號', 'order_id', 'order id'],
'order_no': ['訂單單號', '訂單號', 'order_no', 'order no', '銷貨單號'],
'cust_id': ['客戶編號', '客戶代碼', '客戶代號', 'cust_id', 'cust id', 'erp code', 'erp_code', 'erpcode', 'erp'],
'customer': ['客戶', '客戶名稱', 'customer', 'customer name'],
'pn': ['內部料號', '料號', 'part number', 'pn', 'part no', '產品料號', 'type'],
'qty': ['訂單量', '數量', 'qty', 'quantity', '訂購數量', '出貨數量'],
'status': ['狀態', 'status', '訂單狀態'],
'amount': ['原幣金額(含稅)', '台幣金額(未稅)', '金額', 'amount', 'total', '訂單金額'],
'date': ['訂單日期', '日期', 'date', 'order date', 'order_date']
}
}
class ExcelParser:
def __init__(self):
self.parsed_files: Dict[str, Dict] = {}
def detect_encoding(self, file_path: Path) -> str:
"""偵測檔案編碼"""
with open(file_path, 'rb') as f:
result = chardet.detect(f.read(10000))
return result.get('encoding', 'utf-8')
def find_header_row(self, df: pd.DataFrame, file_type: str) -> int:
"""自動偵測表頭位置"""
expected_columns = set()
for variants in COLUMN_MAPPING[file_type].values():
expected_columns.update([v.lower() for v in variants])
for idx in range(min(MAX_HEADER_SCAN_ROWS, len(df))):
row = df.iloc[idx]
row_values = [str(v).lower().strip() for v in row.values if pd.notna(v)]
# 檢查是否有匹配的欄位名稱
matches = sum(1 for v in row_values if any(exp in v for exp in expected_columns))
if matches >= 2: # 至少匹配 2 個欄位
return idx
return 0 # 預設第一行為表頭
def map_columns(self, df: pd.DataFrame, file_type: str) -> Dict[str, str]:
"""將 DataFrame 欄位對應到標準欄位名稱"""
mapping = {}
column_map = COLUMN_MAPPING[file_type]
df_columns = [str(c).lower().strip() for c in df.columns]
for standard_name, variants in column_map.items():
for variant in variants:
variant_lower = variant.lower()
for idx, col in enumerate(df_columns):
if variant_lower in col or col in variant_lower:
mapping[df.columns[idx]] = standard_name
print(f"[DEBUG] Mapped '{df.columns[idx]}' to '{standard_name}' (matched '{variant}')")
break
if standard_name in mapping.values():
break
print(f"[DEBUG] Final Mapping for {file_type}: {mapping}")
return mapping
def parse_file(self, file_path: Path, file_type: str) -> Tuple[str, Dict[str, Any]]:
"""解析 Excel/CSV 檔案"""
file_id = str(uuid.uuid4())
# 讀取檔案
if file_path.suffix.lower() == '.csv':
encoding = self.detect_encoding(file_path)
df = pd.read_csv(file_path, encoding=encoding, header=None)
else:
df = pd.read_excel(file_path, header=None)
# 找到表頭
header_row = self.find_header_row(df, file_type)
# 重新讀取,以正確的表頭
if file_path.suffix.lower() == '.csv':
df = pd.read_csv(file_path, encoding=encoding, header=header_row)
else:
df = pd.read_excel(file_path, header=header_row)
# 欄位對應
column_mapping = self.map_columns(df, file_type)
df = df.rename(columns=column_mapping)
# 只保留需要的欄位
required_columns = list(COLUMN_MAPPING[file_type].keys())
available_columns = [c for c in required_columns if c in df.columns]
df = df[available_columns]
# 清理資料
df = df.dropna(how='all')
# 產生預覽資料(清理 NaN 值以便 JSON 序列化)
preview = clean_records(df.head(10).to_dict(orient='records'))
# 儲存解析結果
parsed_data = {
'file_id': file_id,
'file_type': file_type,
'filename': file_path.name,
'header_row': header_row,
'row_count': len(df),
'columns': list(df.columns),
'preview': preview,
'dataframe': df
}
self.parsed_files[file_id] = parsed_data
return file_id, {k: v for k, v in parsed_data.items() if k != 'dataframe'}
def get_parsed_data(self, file_id: str) -> Optional[pd.DataFrame]:
"""取得解析後的 DataFrame"""
if file_id in self.parsed_files:
return self.parsed_files[file_id].get('dataframe')
return None
def get_file_info(self, file_id: str) -> Optional[Dict]:
"""取得檔案資訊"""
if file_id in self.parsed_files:
data = self.parsed_files[file_id]
return {k: v for k, v in data.items() if k != 'dataframe'}
return None
# 全域實例
excel_parser = ExcelParser()