495 lines
18 KiB
Python
495 lines
18 KiB
Python
from typing import List, Optional
|
||
from datetime import datetime, timedelta
|
||
from fastapi import APIRouter, Depends, Query
|
||
from sqlalchemy.orm import Session
|
||
from sqlalchemy import func, and_
|
||
from pydantic import BaseModel
|
||
from app.models import get_db
|
||
from app.models.sample import SampleRecord
|
||
from app.models.order import OrderRecord
|
||
|
||
router = APIRouter(prefix="/lab", tags=["Lab"])
|
||
|
||
class LabKPI(BaseModel):
|
||
converted_count: int # 成功收單總數
|
||
avg_velocity: float # 平均轉換時間 (天)
|
||
conversion_rate: float # 轉換比例 (%)
|
||
orphan_count: int # 孤兒樣品總數
|
||
|
||
class ConversionRecord(BaseModel):
|
||
customer: str
|
||
pn: str
|
||
sample_date: str
|
||
sample_qty: int
|
||
order_date: str
|
||
order_qty: int
|
||
days_to_convert: int
|
||
|
||
# ... (ScatterPoint and OrphanSample classes remain same)
|
||
class ScatterPoint(BaseModel):
|
||
customer: str
|
||
pn: str
|
||
sample_qty: int
|
||
order_qty: int
|
||
|
||
class OrphanSample(BaseModel):
|
||
customer: str
|
||
pn: str
|
||
days_since_sent: int
|
||
order_no: str
|
||
date: str
|
||
|
||
# ... (parse_date function remains same)
|
||
|
||
|
||
|
||
# Helper to build order lookups
|
||
from app.services.fuzzy_matcher import normalize_pn_for_matching, normalize_customer_name
|
||
|
||
def build_order_lookups(orders):
|
||
order_lookup_by_id = {}
|
||
order_lookup_by_name = {}
|
||
|
||
for o in orders:
|
||
clean_pn = normalize_pn_for_matching(o.pn)
|
||
clean_cust_id = o.cust_id.strip().upper() if o.cust_id else ""
|
||
norm_cust_name = normalize_customer_name(o.customer)
|
||
|
||
o_date = parse_date(o.date) or (o.created_at.replace(tzinfo=None) if o.created_at else datetime.max)
|
||
|
||
data = {
|
||
"date": o_date,
|
||
"qty": o.qty or 0,
|
||
"order_no": o.order_no
|
||
}
|
||
|
||
if clean_cust_id:
|
||
key_id = (clean_cust_id, clean_pn)
|
||
if key_id not in order_lookup_by_id: order_lookup_by_id[key_id] = []
|
||
order_lookup_by_id[key_id].append(data)
|
||
|
||
key_name = (norm_cust_name, clean_pn)
|
||
if key_name not in order_lookup_by_name: order_lookup_by_name[key_name] = []
|
||
order_lookup_by_name[key_name].append(data)
|
||
|
||
return order_lookup_by_id, order_lookup_by_name
|
||
|
||
@router.get("/conversions", response_model=List[ConversionRecord])
|
||
def get_conversions(db: Session = Depends(get_db)):
|
||
# 找出所有樣品
|
||
samples = db.query(SampleRecord).all()
|
||
# 找出所有訂單
|
||
orders = db.query(OrderRecord).all()
|
||
|
||
order_lookup_by_id, order_lookup_by_name = build_order_lookups(orders)
|
||
|
||
conversions = []
|
||
|
||
# We want to list "Sample Records" that successfully converted.
|
||
# Or "Groups"? The user said "list of sample sent and their order qty".
|
||
# Listing each sample record seems appropriate.
|
||
|
||
for s in samples:
|
||
clean_pn = normalize_pn_for_matching(s.pn)
|
||
norm_cust_name = normalize_customer_name(s.customer)
|
||
clean_cust_id = s.cust_id.strip().upper() if s.cust_id else ""
|
||
s_date = parse_date(s.date)
|
||
|
||
matched_orders = []
|
||
|
||
# 1. Try via ID
|
||
if clean_cust_id:
|
||
if (clean_cust_id, clean_pn) in order_lookup_by_id:
|
||
matched_orders.extend(order_lookup_by_id[(clean_cust_id, clean_pn)])
|
||
|
||
# 2. Try via Name (Fallback)
|
||
if not matched_orders:
|
||
if (norm_cust_name, clean_pn) in order_lookup_by_name:
|
||
matched_orders.extend(order_lookup_by_name[(norm_cust_name, clean_pn)])
|
||
|
||
if matched_orders and s_date:
|
||
# Sort orders by date
|
||
matched_orders.sort(key=lambda x: x["date"])
|
||
first_order = matched_orders[0]
|
||
|
||
# Simple aggregations if multiple orders? User asked for "their order qty".
|
||
# showing total order qty for this PN/Cust might be better
|
||
total_order_qty = sum(o["qty"] for o in matched_orders)
|
||
|
||
days_diff = (first_order["date"] - s_date).days
|
||
|
||
# Filter unrealistic past orders?
|
||
# if days_diff < 0: continue # Optional
|
||
|
||
conversions.append(ConversionRecord(
|
||
customer=s.customer,
|
||
pn=s.pn,
|
||
sample_date=s.date,
|
||
sample_qty=s.qty or 0,
|
||
order_date=first_order["date"].strftime("%Y-%m-%d"), # First order date
|
||
order_qty=total_order_qty,
|
||
days_to_convert=days_diff
|
||
))
|
||
|
||
# Sort by recent sample date
|
||
return sorted(conversions, key=lambda x: x.sample_date, reverse=True)
|
||
|
||
def parse_date(date_str: str) -> Optional[datetime]:
|
||
if not date_str:
|
||
return None
|
||
val = str(date_str).strip()
|
||
# Try parsing YYYYMMDD
|
||
if len(val) == 8 and val.isdigit():
|
||
try:
|
||
return datetime.strptime(val, "%Y%m%d")
|
||
except ValueError:
|
||
pass
|
||
|
||
for fmt in ("%Y-%m-%d", "%Y/%m/%d", "%Y-%m-%d %H:%M:%S", "%Y/%m/%d %H:%M:%S", "%d-%b-%y"):
|
||
try:
|
||
return datetime.strptime(str(date_str).split(' ')[0], fmt.split(' ')[0])
|
||
except ValueError:
|
||
continue
|
||
return None
|
||
|
||
@router.get("/kpi", response_model=LabKPI)
|
||
def get_lab_kpi(
|
||
start_date: Optional[str] = Query(None),
|
||
end_date: Optional[str] = Query(None),
|
||
db: Session = Depends(get_db)
|
||
):
|
||
# 1. 取得所有樣品與訂單
|
||
samples_query = db.query(SampleRecord)
|
||
orders_query = db.query(OrderRecord)
|
||
|
||
if start_date:
|
||
samples_query = samples_query.filter(SampleRecord.date >= start_date)
|
||
orders_query = orders_query.filter(OrderRecord.date >= start_date)
|
||
|
||
if end_date:
|
||
samples_query = samples_query.filter(SampleRecord.date <= end_date)
|
||
orders_query = orders_query.filter(OrderRecord.date <= end_date)
|
||
|
||
samples = samples_query.all()
|
||
orders = orders_query.all()
|
||
|
||
# 建立群組 (ERP Code + PN)
|
||
# ERP Code correspond to cust_id
|
||
from app.services.fuzzy_matcher import normalize_pn_for_matching
|
||
|
||
sample_groups = {}
|
||
for s in samples:
|
||
# Use simple normalization like stripping spaces
|
||
clean_pn = normalize_pn_for_matching(s.pn)
|
||
clean_cust = s.cust_id.strip().upper() if s.cust_id else ""
|
||
key = (clean_cust, clean_pn)
|
||
if key not in sample_groups:
|
||
sample_groups[key] = []
|
||
sample_groups[key].append(s)
|
||
|
||
order_groups = {}
|
||
for o in orders:
|
||
clean_pn = normalize_pn_for_matching(o.pn)
|
||
clean_cust = o.cust_id.strip().upper() if o.cust_id else ""
|
||
key = (clean_cust, clean_pn)
|
||
if key not in order_groups:
|
||
order_groups[key] = []
|
||
order_groups[key].append(o)
|
||
|
||
# 計算 Velocity 與 轉換率
|
||
velocities = []
|
||
converted_samples_count = 0
|
||
total_samples_count = len(samples)
|
||
|
||
# Re-use the lookup maps built above if possible, but we need to build them first.
|
||
# Let's rebuild lookups here for clarity or refactor.
|
||
# To be safe and clean, let's just implement the loop here.
|
||
|
||
from app.services.fuzzy_matcher import normalize_pn_for_matching, normalize_customer_name
|
||
|
||
order_lookup_by_id = {}
|
||
order_lookup_by_name = {}
|
||
|
||
for o in orders:
|
||
clean_pn = normalize_pn_for_matching(o.pn)
|
||
clean_cust_id = o.cust_id.strip().upper() if o.cust_id else ""
|
||
norm_cust_name = normalize_customer_name(o.customer)
|
||
|
||
o_date = parse_date(o.date) or (o.created_at.replace(tzinfo=None) if o.created_at else datetime.max)
|
||
|
||
if clean_cust_id:
|
||
key_id = (clean_cust_id, clean_pn)
|
||
if key_id not in order_lookup_by_id: order_lookup_by_id[key_id] = []
|
||
order_lookup_by_id[key_id].append(o_date)
|
||
|
||
key_name = (norm_cust_name, clean_pn)
|
||
if key_name not in order_lookup_by_name: order_lookup_by_name[key_name] = []
|
||
order_lookup_by_name[key_name].append(o_date)
|
||
|
||
|
||
# Group Samples by (CustName, PN) for calculation to avoid double counting if multiple samples -> same order
|
||
# Actually, "Conversion Rate" is usually "Percentage of Sample Records that resulted in Order".
|
||
# Or "Percentage of Projects". Let's stick to "Sample Groups" (Unique trials).
|
||
|
||
unique_sample_groups = {} # (norm_cust_name, clean_pn) -> list of sample dates
|
||
|
||
for s in samples:
|
||
clean_pn = normalize_pn_for_matching(s.pn)
|
||
norm_cust_name = normalize_customer_name(s.customer)
|
||
clean_cust_id = s.cust_id.strip().upper() if s.cust_id else ""
|
||
|
||
key = (norm_cust_name, clean_pn) # Group by Name+PN
|
||
if key not in unique_sample_groups:
|
||
unique_sample_groups[key] = {
|
||
"dates": [],
|
||
"cust_ids": set()
|
||
}
|
||
s_date = parse_date(s.date)
|
||
if s_date: unique_sample_groups[key]["dates"].append(s_date)
|
||
if clean_cust_id: unique_sample_groups[key]["cust_ids"].add(clean_cust_id)
|
||
|
||
|
||
# Calculate
|
||
total_samples_count = len(unique_sample_groups) # Total "Projects"
|
||
converted_count = 0
|
||
|
||
orphan_count = 0
|
||
now = datetime.now()
|
||
|
||
for key, data in unique_sample_groups.items():
|
||
norm_cust_name, clean_pn = key
|
||
|
||
# Try finding orders
|
||
matched_dates = []
|
||
|
||
# 1. Try via ID
|
||
for cid in data["cust_ids"]:
|
||
if (cid, clean_pn) in order_lookup_by_id:
|
||
matched_dates.extend(order_lookup_by_id[(cid, clean_pn)])
|
||
|
||
# 2. Try via Name
|
||
if not matched_dates:
|
||
if key in order_lookup_by_name:
|
||
matched_dates.extend(order_lookup_by_name[key])
|
||
|
||
if matched_dates:
|
||
converted_count += 1
|
||
# Velocity
|
||
earliest_sample = min(data["dates"]) if data["dates"] else None
|
||
# Filter orders that came AFTER sample? Or just first order?
|
||
# Typically first order date.
|
||
first_order = min(matched_dates) if matched_dates else None
|
||
|
||
if earliest_sample and first_order:
|
||
diff = (first_order - earliest_sample).days
|
||
if diff >= 0:
|
||
velocities.append(diff)
|
||
else:
|
||
# Check Orphan (No Order)
|
||
# Use earliest sample date
|
||
earliest_sample = min(data["dates"]) if data["dates"] else None
|
||
if earliest_sample and (now - earliest_sample).days > 90:
|
||
orphan_count += 1
|
||
|
||
avg_velocity = sum(velocities) / len(velocities) if velocities else 0
|
||
conversion_rate = (converted_count / total_samples_count * 100) if total_samples_count > 0 else 0
|
||
|
||
return LabKPI(
|
||
converted_count=converted_count,
|
||
avg_velocity=round(avg_velocity, 1),
|
||
conversion_rate=round(conversion_rate, 1),
|
||
orphan_count=orphan_count
|
||
)
|
||
|
||
@router.get("/scatter", response_model=List[ScatterPoint])
|
||
def get_scatter_data(
|
||
start_date: Optional[str] = Query(None),
|
||
end_date: Optional[str] = Query(None),
|
||
db: Session = Depends(get_db)
|
||
):
|
||
samples_query = db.query(SampleRecord)
|
||
orders_query = db.query(OrderRecord)
|
||
|
||
if start_date:
|
||
samples_query = samples_query.filter(SampleRecord.date >= start_date)
|
||
if end_date:
|
||
samples_query = samples_query.filter(SampleRecord.date <= end_date)
|
||
|
||
samples = samples_query.all()
|
||
orders = orders_query.all()
|
||
|
||
# 聚合資料
|
||
from app.services.fuzzy_matcher import normalize_pn_for_matching, normalize_customer_name
|
||
|
||
# 建立多重索引的 Order Lookup
|
||
# order_lookup_by_id: (cust_id, pn) -> Order Data
|
||
# order_lookup_by_name: (cust_name, pn) -> Order Data
|
||
order_lookup_by_id = {}
|
||
order_lookup_by_name = {}
|
||
|
||
for o in orders:
|
||
clean_pn = normalize_pn_for_matching(o.pn)
|
||
clean_cust_id = o.cust_id.strip().upper() if o.cust_id else ""
|
||
norm_cust_name = normalize_customer_name(o.customer)
|
||
|
||
# Aggregate by Cust ID
|
||
if clean_cust_id:
|
||
key_id = (clean_cust_id, clean_pn)
|
||
if key_id not in order_lookup_by_id:
|
||
order_lookup_by_id[key_id] = {"qty": 0, "dates": []}
|
||
order_lookup_by_id[key_id]["qty"] += (o.qty or 0)
|
||
if o.date: order_lookup_by_id[key_id]["dates"].append(parse_date(o.date) or datetime.max)
|
||
elif o.created_at: order_lookup_by_id[key_id]["dates"].append(o.created_at.replace(tzinfo=None))
|
||
|
||
# Aggregate by Cust Name (Fallback)
|
||
key_name = (norm_cust_name, clean_pn)
|
||
if key_name not in order_lookup_by_name:
|
||
order_lookup_by_name[key_name] = {"qty": 0, "dates": []}
|
||
order_lookup_by_name[key_name]["qty"] += (o.qty or 0)
|
||
if o.date: order_lookup_by_name[key_name]["dates"].append(parse_date(o.date) or datetime.max)
|
||
elif o.created_at: order_lookup_by_name[key_name]["dates"].append(o.created_at.replace(tzinfo=None))
|
||
|
||
|
||
final_data_map = {} # Key (Display Customer, Original PN) -> Data
|
||
|
||
for s in samples:
|
||
clean_pn = normalize_pn_for_matching(s.pn)
|
||
clean_cust_id = s.cust_id.strip().upper() if s.cust_id else ""
|
||
norm_cust_name = normalize_customer_name(s.customer)
|
||
|
||
# 嘗試比對 Order
|
||
matched_order = None
|
||
|
||
# 1. Try Cust ID match
|
||
if clean_cust_id:
|
||
matched_order = order_lookup_by_id.get((clean_cust_id, clean_pn))
|
||
|
||
# 2. If no match, Try Cust Name match
|
||
if not matched_order:
|
||
matched_order = order_lookup_by_name.get((norm_cust_name, clean_pn))
|
||
|
||
# Render Key using Sample's info
|
||
display_key = (s.customer, s.pn)
|
||
if display_key not in final_data_map:
|
||
final_data_map[display_key] = {"sample_qty": 0, "order_qty": 0, "customer": s.customer, "orignal_pn": s.pn}
|
||
|
||
final_data_map[display_key]["sample_qty"] += (s.qty or 0)
|
||
|
||
if matched_order:
|
||
# 注意:這裡簡單累加可能會導致重複計算如果多個樣品對應同一個訂單聚合
|
||
# 但目前邏輯是以「樣品」為基底看轉換,所以我們顯示該樣品對應到的訂單總量是合理的
|
||
# 不過為了 scatter plot 的準確性,我們應該只在第一次遇到這個 key 時加上 order qty?
|
||
# 或者,Scatter Plot 的點是 (Customer, PN),所以我們應該是把這個 Group 的 Sample Qty 和 Order Qty 放在一起。
|
||
# Order Qty 已經在 lookup 裡聚合過了。
|
||
pass
|
||
|
||
# Re-construct the final map properly merging Order Data
|
||
# 上面的迴圈有點問題,因為我們是依據 Sample 來建立點,但 Order 總量是固定的。
|
||
# 正確做法:以 (Customer, PN) 為 Unique Key。
|
||
|
||
unique_groups = {} # (norm_cust_name, clean_pn) -> {display_cust, display_pn, sample_qty, order_qty}
|
||
|
||
for s in samples:
|
||
clean_pn = normalize_pn_for_matching(s.pn)
|
||
norm_cust_name = normalize_customer_name(s.customer)
|
||
key = (norm_cust_name, clean_pn)
|
||
|
||
if key not in unique_groups:
|
||
unique_groups[key] = {
|
||
"display_cust": s.customer,
|
||
"display_pn": s.pn,
|
||
"sample_qty": 0,
|
||
"order_qty": 0,
|
||
"matched": False
|
||
}
|
||
unique_groups[key]["sample_qty"] += (s.qty or 0)
|
||
|
||
# Fill in Order Qty
|
||
for key, data in unique_groups.items():
|
||
norm_cust_name, clean_pn = key
|
||
|
||
# Try finding orders
|
||
# Note: We rely on Name match here primarily since we grouped by Name.
|
||
# Ideally we should also check CustID if available on the samples in this group, but grouping by Name is safer for visual scatter plot.
|
||
|
||
matched_order = order_lookup_by_name.get((norm_cust_name, clean_pn))
|
||
|
||
# If no name match, maybe check if any sample in this group had a CustId that matches?
|
||
# For simplicity, let's stick to Name+PN for the Scatter Plot aggregation
|
||
|
||
if matched_order:
|
||
data["order_qty"] = matched_order["qty"]
|
||
data["matched"] = True
|
||
|
||
data_map = unique_groups # Replace old data_map logic
|
||
|
||
# 如果有訂單但沒樣品,我們在 ROI 分析中可能不顯示,或者顯示在 Y 軸上 X=0。
|
||
# 根據需求:分析「樣品寄送」與「訂單接收」的關聯,通常以有送樣的為基底。
|
||
|
||
return [
|
||
ScatterPoint(
|
||
customer=v["display_cust"],
|
||
pn=v["display_pn"],
|
||
sample_qty=v["sample_qty"],
|
||
order_qty=v["order_qty"]
|
||
)
|
||
for key, v in data_map.items()
|
||
]
|
||
|
||
@router.get("/orphans", response_model=List[OrphanSample])
|
||
def get_orphans(db: Session = Depends(get_db)):
|
||
now = datetime.now()
|
||
threshold_date = now - timedelta(days=90)
|
||
|
||
# 找出所有樣品
|
||
samples = db.query(SampleRecord).all()
|
||
# 找出所有訂單
|
||
orders = db.query(OrderRecord).all()
|
||
|
||
# Build Order Lookups (ID and Name)
|
||
from app.services.fuzzy_matcher import normalize_pn_for_matching, normalize_customer_name
|
||
|
||
order_keys_id = set()
|
||
order_keys_name = set()
|
||
|
||
for o in orders:
|
||
clean_pn = normalize_pn_for_matching(o.pn)
|
||
clean_cust_id = o.cust_id.strip().upper() if o.cust_id else ""
|
||
norm_cust_name = normalize_customer_name(o.customer)
|
||
|
||
if clean_cust_id:
|
||
order_keys_id.add((clean_cust_id, clean_pn))
|
||
|
||
order_keys_name.add((norm_cust_name, clean_pn))
|
||
|
||
|
||
orphans = []
|
||
for s in samples:
|
||
clean_pn = normalize_pn_for_matching(s.pn)
|
||
norm_cust_name = normalize_customer_name(s.customer)
|
||
clean_cust_id = s.cust_id.strip().upper() if s.cust_id else ""
|
||
|
||
s_date = parse_date(s.date)
|
||
|
||
# Check match
|
||
matched = False
|
||
if clean_cust_id:
|
||
if (clean_cust_id, clean_pn) in order_keys_id:
|
||
matched = True
|
||
|
||
if not matched:
|
||
if (norm_cust_name, clean_pn) in order_keys_name:
|
||
matched = True
|
||
|
||
if not matched:
|
||
if s_date and s_date < threshold_date:
|
||
orphans.append(OrphanSample(
|
||
customer=s.customer,
|
||
pn=s.pn,
|
||
days_since_sent=(now - s_date).days,
|
||
order_no=s.order_no,
|
||
date=s.date
|
||
))
|
||
|
||
return sorted(orphans, key=lambda x: x.days_since_sent, reverse=True)
|