import shutil from pathlib import Path from typing import List import pandas as pd from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form from sqlalchemy.orm import Session from pydantic import BaseModel from app.models import get_db from app.models.dit import DitRecord from app.models.sample import SampleRecord from app.models.order import OrderRecord from app.models.match import MatchResult, TargetType, ReviewLog from app.config import UPLOAD_DIR from app.services.excel_parser import excel_parser from app.services.fuzzy_matcher import normalize_customer_name, sanitize_pn router = APIRouter(prefix="/etl", tags=["ETL"]) class ParsedFileResponse(BaseModel): file_id: str file_type: str filename: str header_row: int row_count: int preview: List[dict] class ImportRequest(BaseModel): file_id: str class ImportResponse(BaseModel): imported_count: int @router.post("/upload", response_model=ParsedFileResponse) async def upload_file( file: UploadFile = File(...), file_type: str = Form(...), db: Session = Depends(get_db) ): """上傳並解析 Excel 檔案""" if file_type not in ['dit', 'sample', 'order']: raise HTTPException(status_code=400, detail="Invalid file type") # 儲存檔案 file_path = UPLOAD_DIR / file.filename with open(file_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) try: # 解析檔案 file_id, file_info = excel_parser.parse_file(file_path, file_type) return ParsedFileResponse( file_id=file_id, file_type=file_info['file_type'], filename=file_info['filename'], header_row=file_info['header_row'], row_count=file_info['row_count'], preview=file_info['preview'] ) except Exception as e: raise HTTPException(status_code=400, detail=f"Failed to parse file: {str(e)}") @router.get("/preview/{file_id}", response_model=ParsedFileResponse) def get_preview(file_id: str): """取得檔案預覽""" file_info = excel_parser.get_file_info(file_id) if not file_info: raise HTTPException(status_code=404, detail="File not found") return ParsedFileResponse( file_id=file_info['file_id'], file_type=file_info['file_type'], filename=file_info['filename'], header_row=file_info['header_row'], row_count=file_info['row_count'], preview=file_info['preview'] ) def clean_value(val, default=''): """清理欄位值,處理 nan 和空值""" if val is None or (isinstance(val, float) and pd.isna(val)): return default str_val = str(val).strip() # Remove leading apostrophe often added by Excel (e.g. '001) str_val = str_val.lstrip("'") if str_val.lower() in ('nan', 'none', 'null', ''): return default return str_val def normalize_date(val): """將日期標準化為 YYYY-MM-DD 格式""" val = clean_value(val, None) if not val: return None # 嘗試解析常見格式 from datetime import datetime for fmt in ("%Y-%m-%d", "%Y/%m/%d", "%Y-%m-%d %H:%M:%S", "%Y/%m/%d %H:%M:%S", "%d-%b-%y"): try: # Handle Excel default string format often like 2025/9/30 dt = datetime.strptime(val.split(' ')[0], fmt.split(' ')[0]) return dt.strftime("%Y-%m-%d") except ValueError: continue return val # Return original if parse failed @router.post("/import", response_model=ImportResponse) def import_data(request: ImportRequest, db: Session = Depends(get_db)): """匯入資料到資料庫""" import traceback try: file_info = excel_parser.get_file_info(request.file_id) if not file_info: print(f"[ETL Import] Error: File not found for file_id={request.file_id}") raise HTTPException(status_code=404, detail="File not found") df = excel_parser.get_parsed_data(request.file_id) if df is None: print(f"[ETL Import] Error: Parsed data not found for file_id={request.file_id}") raise HTTPException(status_code=404, detail="Parsed data not found") print(f"[ETL Import] Starting import: file_type={file_info['file_type']}, rows={len(df)}") file_type = file_info['file_type'] imported_count = 0 seen_ids = set() # 追蹤已處理的 ID,避免檔案內重複 # 清除該類型的舊資料,避免重複鍵衝突 try: if file_type == 'dit': print("[ETL Import] Clearing old DIT records and dependent matches/logs...") # 先清除與 DIT 相關的審核日誌與比對結果 db.query(ReviewLog).delete() db.query(MatchResult).delete() db.query(DitRecord).delete() elif file_type == 'sample': print("[ETL Import] Clearing old Sample records and dependent matches/logs...") # 先清除與 Sample 相關的比對結果 (及其日誌) # 這裡比較複雜,因為 ReviewLog 是透過 MatchResult 關聯的 # 但既然我們是清空整個類別,直接清空所有 ReviewLog 和對應的 MatchResult 是最安全的 db.query(ReviewLog).delete() db.query(MatchResult).filter(MatchResult.target_type == TargetType.SAMPLE).delete() db.query(SampleRecord).delete() elif file_type == 'order': print("[ETL Import] Clearing old Order records and dependent matches/logs...") db.query(ReviewLog).delete() db.query(MatchResult).filter(MatchResult.target_type == TargetType.ORDER).delete() db.query(OrderRecord).delete() db.flush() # 使用 flush 而非 commit,保持在同一個事務中 print("[ETL Import] Old data cleared successfully.") except Exception as e: db.rollback() print(f"[ETL Import] Error clearing old data: {traceback.format_exc()}") raise HTTPException(status_code=500, detail=f"Failed to clear old data: {str(e)}") for idx, row in df.iterrows(): try: if file_type == 'dit': op_id = clean_value(row.get('op_id'), '') erp_account = clean_value(row.get('erp_account'), '') customer = clean_value(row.get('customer')) pn = clean_value(row.get('pn')) # 跳過無效資料列或重複的 op_id + pn 組合 unique_key = f"{op_id}|{pn}" if not op_id or unique_key in seen_ids: continue seen_ids.add(unique_key) record = DitRecord( op_id=op_id, op_name=clean_value(row.get('op_name')), erp_account=erp_account, customer=customer, customer_normalized=normalize_customer_name(customer), pn=sanitize_pn(pn), eau=int(row.get('eau', 0)) if row.get('eau') and not pd.isna(row.get('eau')) else 0, stage=clean_value(row.get('stage')), date=normalize_date(row.get('date')) ) elif file_type == 'sample': sample_id = clean_value(row.get('sample_id'), f'S{idx}') oppy_no = clean_value(row.get('oppy_no'), '') cust_id = clean_value(row.get('cust_id'), '') customer = clean_value(row.get('customer')) pn = clean_value(row.get('pn')) # 跳過重複的 sample_id if sample_id in seen_ids: continue seen_ids.add(sample_id) record = SampleRecord( sample_id=sample_id, order_no=clean_value(row.get('order_no')), oppy_no=oppy_no, cust_id=cust_id, customer=customer, customer_normalized=normalize_customer_name(customer), pn=sanitize_pn(pn), qty=int(row.get('qty', 0)) if row.get('qty') and not pd.isna(row.get('qty')) else 0, date=normalize_date(row.get('date')) ) elif file_type == 'order': order_id = clean_value(row.get('order_id'), f'O{idx}') cust_id = clean_value(row.get('cust_id'), '') customer = clean_value(row.get('customer')) pn = clean_value(row.get('pn')) # 跳過重複的 order_id if order_id in seen_ids: continue seen_ids.add(order_id) record = OrderRecord( order_id=order_id, order_no=clean_value(row.get('order_no')), cust_id=cust_id, customer=customer, customer_normalized=normalize_customer_name(customer), pn=sanitize_pn(pn), qty=int(float(row.get('qty', 0)) * 1000) if row.get('qty') and not pd.isna(row.get('qty')) else 0, status=clean_value(row.get('status'), 'Backlog'), amount=float(row.get('amount', 0)) if row.get('amount') and not pd.isna(row.get('amount')) else 0, date=normalize_date(row.get('date')) ) else: continue db.add(record) imported_count += 1 if imported_count % 500 == 0: print(f"[ETL Import] Processed {imported_count} rows...") except Exception as e: print(f"[ETL Import] Error importing row {idx}: {e}") continue try: print(f"[ETL Import] Committing {imported_count} records...") db.commit() print(f"[ETL Import] Import successful: {imported_count} records.") except Exception as e: db.rollback() print(f"[ETL Import] Commit Error: {traceback.format_exc()}") raise HTTPException(status_code=500, detail=f"Failed to commit data: {str(e)}") return ImportResponse(imported_count=imported_count) except HTTPException: raise except Exception as e: print(f"[ETL Import] Unhandled Exception: {traceback.format_exc()}") raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}") @router.get("/data/{data_type}") def get_data(data_type: str, db: Session = Depends(get_db)): """取得已匯入的資料""" if data_type == 'dit': records = db.query(DitRecord).all() elif data_type == 'sample': records = db.query(SampleRecord).all() elif data_type == 'order': records = db.query(OrderRecord).all() else: raise HTTPException(status_code=400, detail="Invalid data type") return [ { **{c.name: getattr(record, c.name) for c in record.__table__.columns} } for record in records ] @router.delete("/data") def clear_all_data(db: Session = Depends(get_db)): """清除所有匯入的資料與分析結果""" try: print("[ETL] Clearing all data...") db.query(ReviewLog).delete() db.query(MatchResult).delete() db.query(DitRecord).delete() db.query(SampleRecord).delete() db.query(OrderRecord).delete() db.commit() print("[ETL] All data cleared successfully.") return {"message": "All data cleared successfully"} except Exception as e: db.rollback() print(f"[ETL] Error clearing data: {e}") raise HTTPException(status_code=500, detail=str(e))