import re import uuid import math from pathlib import Path from typing import List, Dict, Any, Optional, Tuple import pandas as pd import chardet from openpyxl import load_workbook from app.config import MAX_HEADER_SCAN_ROWS, UPLOAD_DIR def clean_value(val): """清理單一值,將 NaN/Inf 轉換為 None 以便 JSON 序列化""" if val is None: return None if isinstance(val, float): if math.isnan(val) or math.isinf(val): return None return val def clean_dict(d: Dict) -> Dict: """清理字典中的所有 NaN/Inf 值""" return {k: clean_value(v) for k, v in d.items()} def clean_records(records: List[Dict]) -> List[Dict]: """清理記錄列表中的所有 NaN/Inf 值""" return [clean_dict(r) for r in records] # 欄位名稱對應表 COLUMN_MAPPING = { 'dit': { 'op_id': ['opportunity name', 'opportunity no', 'opportunity', 'op編號', 'op 編號', 'op_id', 'opid', '案件編號', '案號', 'opportunity id'], 'erp_account': ['erp account', 'account no', 'erp account no', '客戶代碼', '客戶編號', 'erp_account'], 'customer': ['account name', 'branding customer', '客戶', '客戶名稱', 'customer', 'customer name', '公司名稱'], 'pn': ['product name', '料號', 'part number', 'pn', 'part no', 'part_number', '產品料號', 'stage/part'], 'eau': ['eau quantity', 'eau quantity (pcs)', 'eau', '年預估量', 'annual usage', '預估用量'], 'stage': ['stage', 'oppty product stage', '階段', 'status', '狀態', '專案階段'], 'date': ['created date', '日期', 'date', '建立日期', 'create date'] }, 'sample': { 'sample_id': ['樣品訂單號碼', 'item', '樣品編號', 'sample_id', 'sample id', '編號'], 'order_no': ['樣品訂單號碼', '單號', 'order_no', 'order no', '樣品單號', '申請單號'], 'oppy_no': ['oppy no', 'oppy_no', '案號', '案件編號', 'opportunity no'], 'cust_id': ['cust id', 'cust_id', '客戶編號', '客戶代碼', '客戶代號'], 'customer': ['客戶名稱', '客戶簡稱', '客戶', 'customer', 'customer name'], 'pn': ['item', 'type', '料號', 'part number', 'pn', 'part no', '產品料號', '索樣數量'], 'qty': ['索樣數量pcs', '索樣數量 k', '數量', 'qty', 'quantity', '申請數量'], 'date': ['需求日', '日期', 'date', '申請日期'] }, 'order': { 'order_id': ['項次', '訂單編號', 'order_id', 'order id'], 'order_no': ['訂單單號', '訂單號', 'order_no', 'order no', '銷貨單號'], 'cust_id': ['客戶編號', '客戶代碼', '客戶代號', 'cust_id', 'cust id'], 'customer': ['客戶', '客戶名稱', 'customer', 'customer name'], 'pn': ['type', '內部料號', '料號', 'part number', 'pn', 'part no', '產品料號'], 'qty': ['訂單量', '數量', 'qty', 'quantity', '訂購數量', '出貨數量'], 'status': ['狀態', 'status', '訂單狀態'], 'amount': ['原幣金額(含稅)', '台幣金額(未稅)', '金額', 'amount', 'total', '訂單金額'] } } class ExcelParser: def __init__(self): self.parsed_files: Dict[str, Dict] = {} def detect_encoding(self, file_path: Path) -> str: """偵測檔案編碼""" with open(file_path, 'rb') as f: result = chardet.detect(f.read(10000)) return result.get('encoding', 'utf-8') def find_header_row(self, df: pd.DataFrame, file_type: str) -> int: """自動偵測表頭位置""" expected_columns = set() for variants in COLUMN_MAPPING[file_type].values(): expected_columns.update([v.lower() for v in variants]) for idx in range(min(MAX_HEADER_SCAN_ROWS, len(df))): row = df.iloc[idx] row_values = [str(v).lower().strip() for v in row.values if pd.notna(v)] # 檢查是否有匹配的欄位名稱 matches = sum(1 for v in row_values if any(exp in v for exp in expected_columns)) if matches >= 2: # 至少匹配 2 個欄位 return idx return 0 # 預設第一行為表頭 def map_columns(self, df: pd.DataFrame, file_type: str) -> Dict[str, str]: """將 DataFrame 欄位對應到標準欄位名稱""" mapping = {} column_map = COLUMN_MAPPING[file_type] df_columns = [str(c).lower().strip() for c in df.columns] for standard_name, variants in column_map.items(): for variant in variants: variant_lower = variant.lower() for idx, col in enumerate(df_columns): if variant_lower in col or col in variant_lower: mapping[df.columns[idx]] = standard_name break if standard_name in mapping.values(): break return mapping def parse_file(self, file_path: Path, file_type: str) -> Tuple[str, Dict[str, Any]]: """解析 Excel/CSV 檔案""" file_id = str(uuid.uuid4()) # 讀取檔案 if file_path.suffix.lower() == '.csv': encoding = self.detect_encoding(file_path) df = pd.read_csv(file_path, encoding=encoding, header=None) else: df = pd.read_excel(file_path, header=None) # 找到表頭 header_row = self.find_header_row(df, file_type) # 重新讀取,以正確的表頭 if file_path.suffix.lower() == '.csv': df = pd.read_csv(file_path, encoding=encoding, header=header_row) else: df = pd.read_excel(file_path, header=header_row) # 欄位對應 column_mapping = self.map_columns(df, file_type) df = df.rename(columns=column_mapping) # 只保留需要的欄位 required_columns = list(COLUMN_MAPPING[file_type].keys()) available_columns = [c for c in required_columns if c in df.columns] df = df[available_columns] # 清理資料 df = df.dropna(how='all') # 產生預覽資料(清理 NaN 值以便 JSON 序列化) preview = clean_records(df.head(10).to_dict(orient='records')) # 儲存解析結果 parsed_data = { 'file_id': file_id, 'file_type': file_type, 'filename': file_path.name, 'header_row': header_row, 'row_count': len(df), 'columns': list(df.columns), 'preview': preview, 'dataframe': df } self.parsed_files[file_id] = parsed_data return file_id, {k: v for k, v in parsed_data.items() if k != 'dataframe'} def get_parsed_data(self, file_id: str) -> Optional[pd.DataFrame]: """取得解析後的 DataFrame""" if file_id in self.parsed_files: return self.parsed_files[file_id].get('dataframe') return None def get_file_info(self, file_id: str) -> Optional[Dict]: """取得檔案資訊""" if file_id in self.parsed_files: data = self.parsed_files[file_id] return {k: v for k, v in data.items() if k != 'dataframe'} return None # 全域實例 excel_parser = ExcelParser()