Files
2026-01-16 18:16:33 +08:00

285 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import shutil
from pathlib import Path
from typing import List
import pandas as pd
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form
from sqlalchemy.orm import Session
from pydantic import BaseModel
from app.models import get_db
from app.models.dit import DitRecord
from app.models.sample import SampleRecord
from app.models.order import OrderRecord
from app.models.match import MatchResult, TargetType, ReviewLog
from app.config import UPLOAD_DIR
from app.services.excel_parser import excel_parser
from app.services.fuzzy_matcher import normalize_customer_name, sanitize_pn
router = APIRouter(prefix="/etl", tags=["ETL"])
class ParsedFileResponse(BaseModel):
file_id: str
file_type: str
filename: str
header_row: int
row_count: int
preview: List[dict]
class ImportRequest(BaseModel):
file_id: str
class ImportResponse(BaseModel):
imported_count: int
@router.post("/upload", response_model=ParsedFileResponse)
async def upload_file(
file: UploadFile = File(...),
file_type: str = Form(...),
db: Session = Depends(get_db)
):
"""上傳並解析 Excel 檔案"""
if file_type not in ['dit', 'sample', 'order']:
raise HTTPException(status_code=400, detail="Invalid file type")
# 儲存檔案
file_path = UPLOAD_DIR / file.filename
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
try:
# 解析檔案
file_id, file_info = excel_parser.parse_file(file_path, file_type)
return ParsedFileResponse(
file_id=file_id,
file_type=file_info['file_type'],
filename=file_info['filename'],
header_row=file_info['header_row'],
row_count=file_info['row_count'],
preview=file_info['preview']
)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Failed to parse file: {str(e)}")
@router.get("/preview/{file_id}", response_model=ParsedFileResponse)
def get_preview(file_id: str):
"""取得檔案預覽"""
file_info = excel_parser.get_file_info(file_id)
if not file_info:
raise HTTPException(status_code=404, detail="File not found")
return ParsedFileResponse(
file_id=file_info['file_id'],
file_type=file_info['file_type'],
filename=file_info['filename'],
header_row=file_info['header_row'],
row_count=file_info['row_count'],
preview=file_info['preview']
)
def clean_value(val, default=''):
"""清理欄位值,處理 nan 和空值"""
if val is None or (isinstance(val, float) and pd.isna(val)):
return default
str_val = str(val).strip()
# Remove leading apostrophe often added by Excel (e.g. '001)
str_val = str_val.lstrip("'")
if str_val.lower() in ('nan', 'none', 'null', ''):
return default
return str_val
def normalize_date(val):
"""將日期標準化為 YYYY-MM-DD 格式"""
val = clean_value(val, None)
if not val:
return None
# 嘗試解析常見格式
from datetime import datetime
for fmt in ("%Y-%m-%d", "%Y/%m/%d", "%Y-%m-%d %H:%M:%S", "%Y/%m/%d %H:%M:%S", "%d-%b-%y"):
try:
# Handle Excel default string format often like 2025/9/30
dt = datetime.strptime(val.split(' ')[0], fmt.split(' ')[0])
return dt.strftime("%Y-%m-%d")
except ValueError:
continue
return val # Return original if parse failed
@router.post("/import", response_model=ImportResponse)
def import_data(request: ImportRequest, db: Session = Depends(get_db)):
"""匯入資料到資料庫"""
import traceback
try:
file_info = excel_parser.get_file_info(request.file_id)
if not file_info:
print(f"[ETL Import] Error: File not found for file_id={request.file_id}")
raise HTTPException(status_code=404, detail="File not found")
df = excel_parser.get_parsed_data(request.file_id)
if df is None:
print(f"[ETL Import] Error: Parsed data not found for file_id={request.file_id}")
raise HTTPException(status_code=404, detail="Parsed data not found")
print(f"[ETL Import] Starting import: file_type={file_info['file_type']}, rows={len(df)}")
file_type = file_info['file_type']
imported_count = 0
seen_ids = set() # 追蹤已處理的 ID避免檔案內重複
# 清除該類型的舊資料,避免重複鍵衝突
try:
if file_type == 'dit':
print("[ETL Import] Clearing old DIT records and dependent matches/logs...")
# 先清除與 DIT 相關的審核日誌與比對結果
db.query(ReviewLog).delete()
db.query(MatchResult).delete()
db.query(DitRecord).delete()
elif file_type == 'sample':
print("[ETL Import] Clearing old Sample records and dependent matches/logs...")
# 先清除與 Sample 相關的比對結果 (及其日誌)
# 這裡比較複雜,因為 ReviewLog 是透過 MatchResult 關聯的
# 但既然我們是清空整個類別,直接清空所有 ReviewLog 和對應的 MatchResult 是最安全的
db.query(ReviewLog).delete()
db.query(MatchResult).filter(MatchResult.target_type == TargetType.SAMPLE).delete()
db.query(SampleRecord).delete()
elif file_type == 'order':
print("[ETL Import] Clearing old Order records and dependent matches/logs...")
db.query(ReviewLog).delete()
db.query(MatchResult).filter(MatchResult.target_type == TargetType.ORDER).delete()
db.query(OrderRecord).delete()
db.flush() # 使用 flush 而非 commit保持在同一個事務中
print("[ETL Import] Old data cleared successfully.")
except Exception as e:
db.rollback()
print(f"[ETL Import] Error clearing old data: {traceback.format_exc()}")
raise HTTPException(status_code=500, detail=f"Failed to clear old data: {str(e)}")
for idx, row in df.iterrows():
try:
if file_type == 'dit':
op_id = clean_value(row.get('op_id'), '')
erp_account = clean_value(row.get('erp_account'), '')
customer = clean_value(row.get('customer'))
pn = clean_value(row.get('pn'))
# 跳過無效資料列或重複的 op_id + pn 組合
unique_key = f"{op_id}|{pn}"
if not op_id or unique_key in seen_ids:
continue
seen_ids.add(unique_key)
record = DitRecord(
op_id=op_id,
op_name=clean_value(row.get('op_name')),
erp_account=erp_account,
customer=customer,
customer_normalized=normalize_customer_name(customer),
pn=sanitize_pn(pn),
eau=int(row.get('eau', 0)) if row.get('eau') and not pd.isna(row.get('eau')) else 0,
stage=clean_value(row.get('stage')),
date=normalize_date(row.get('date'))
)
elif file_type == 'sample':
sample_id = clean_value(row.get('sample_id'), f'S{idx}')
oppy_no = clean_value(row.get('oppy_no'), '')
cust_id = clean_value(row.get('cust_id'), '')
customer = clean_value(row.get('customer'))
pn = clean_value(row.get('pn'))
# 跳過重複的 sample_id
if sample_id in seen_ids:
continue
seen_ids.add(sample_id)
record = SampleRecord(
sample_id=sample_id,
order_no=clean_value(row.get('order_no')),
oppy_no=oppy_no,
cust_id=cust_id,
customer=customer,
customer_normalized=normalize_customer_name(customer),
pn=sanitize_pn(pn),
qty=int(row.get('qty', 0)) if row.get('qty') and not pd.isna(row.get('qty')) else 0,
date=normalize_date(row.get('date'))
)
elif file_type == 'order':
order_id = clean_value(row.get('order_id'), f'O{idx}')
cust_id = clean_value(row.get('cust_id'), '')
customer = clean_value(row.get('customer'))
pn = clean_value(row.get('pn'))
# 跳過重複的 order_id
if order_id in seen_ids:
continue
seen_ids.add(order_id)
record = OrderRecord(
order_id=order_id,
order_no=clean_value(row.get('order_no')),
cust_id=cust_id,
customer=customer,
customer_normalized=normalize_customer_name(customer),
pn=sanitize_pn(pn),
qty=int(float(row.get('qty', 0)) * 1000) if row.get('qty') and not pd.isna(row.get('qty')) else 0,
status=clean_value(row.get('status'), 'Backlog'),
amount=float(row.get('amount', 0)) if row.get('amount') and not pd.isna(row.get('amount')) else 0,
date=normalize_date(row.get('date'))
)
else:
continue
db.add(record)
imported_count += 1
if imported_count % 500 == 0:
print(f"[ETL Import] Processed {imported_count} rows...")
except Exception as e:
print(f"[ETL Import] Error importing row {idx}: {e}")
continue
try:
print(f"[ETL Import] Committing {imported_count} records...")
db.commit()
print(f"[ETL Import] Import successful: {imported_count} records.")
except Exception as e:
db.rollback()
print(f"[ETL Import] Commit Error: {traceback.format_exc()}")
raise HTTPException(status_code=500, detail=f"Failed to commit data: {str(e)}")
return ImportResponse(imported_count=imported_count)
except HTTPException:
raise
except Exception as e:
print(f"[ETL Import] Unhandled Exception: {traceback.format_exc()}")
raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}")
@router.get("/data/{data_type}")
def get_data(data_type: str, db: Session = Depends(get_db)):
"""取得已匯入的資料"""
if data_type == 'dit':
records = db.query(DitRecord).all()
elif data_type == 'sample':
records = db.query(SampleRecord).all()
elif data_type == 'order':
records = db.query(OrderRecord).all()
else:
raise HTTPException(status_code=400, detail="Invalid data type")
return [
{
**{c.name: getattr(record, c.name) for c in record.__table__.columns}
}
for record in records
]
@router.delete("/data")
def clear_all_data(db: Session = Depends(get_db)):
"""清除所有匯入的資料與分析結果"""
try:
print("[ETL] Clearing all data...")
db.query(ReviewLog).delete()
db.query(MatchResult).delete()
db.query(DitRecord).delete()
db.query(SampleRecord).delete()
db.query(OrderRecord).delete()
db.commit()
print("[ETL] All data cleared successfully.")
return {"message": "All data cleared successfully"}
except Exception as e:
db.rollback()
print(f"[ETL] Error clearing data: {e}")
raise HTTPException(status_code=500, detail=str(e))