from typing import List from fastapi import APIRouter, Depends from sqlalchemy.orm import Session from sqlalchemy import func from pydantic import BaseModel from app.models import get_db from app.models.dit import DitRecord from app.models.sample import SampleRecord from app.models.order import OrderRecord from app.models.match import MatchResult, MatchStatus, TargetType router = APIRouter(prefix="/dashboard", tags=["Dashboard"]) class KPIResponse(BaseModel): total_dit: int sample_rate: float # ?見頧??? hit_rate: float # 閮?賭葉?? fulfillment_rate: float # EAU ???? orphan_sample_rate: float # ?⊥??見?? total_revenue: float class FunnelItem(BaseModel): name: str value: int fill: str class AttributionDit(BaseModel): op_id: str customer: str pn: str eau: int stage: str date: str class AttributionSample(BaseModel): order_no: str date: str class AttributionOrder(BaseModel): order_no: str status: str qty: int amount: float class AttributionRow(BaseModel): dit: AttributionDit sample: AttributionSample | None order: AttributionOrder | None match_source: str | None attributed_qty: int fulfillment_rate: float def get_lifo_attribution(db: Session): """?瑁? LIFO 璆剔蜀???摩""" # 1. ???€??DIT嚗??交??望?啗??? (LIFO) dits = db.query(DitRecord).order_by(DitRecord.date.desc()).all() # 2. ???€?歇?寥?銝??閮 matched_orders = db.query(MatchResult, OrderRecord).join( OrderRecord, MatchResult.target_id == OrderRecord.id ).filter( MatchResult.target_type == TargetType.ORDER, MatchResult.status.in_([MatchStatus.accepted, MatchStatus.auto_matched]) ).all() # 3. 撱箇?璆剔蜀瘙?(Revenue Pool) - ??(摰X, ??) ?? order_pools = {} for match, order in matched_orders: key = (order.customer_normalized, order.pn) if key not in order_pools: order_pools[key] = 0 order_pools[key] += (order.qty or 0) # 4. ?脰??? attribution_map = {} # dit_id -> {qty, total_eau} for dit in dits: key = (dit.customer_normalized, dit.pn) eau = dit.eau or 0 allocated = 0 if key in order_pools and order_pools[key] > 0: allocated = min(eau, order_pools[key]) order_pools[key] -= allocated attribution_map[dit.id] = { "qty": allocated, "eau": eau } return attribution_map @router.get("/kpi", response_model=KPIResponse) def get_kpi(db: Session = Depends(get_db)): """?? KPI 蝯梯? (蝚血?閬??v1.0)""" total_dit = db.query(DitRecord).count() if total_dit == 0: return KPIResponse(total_dit=0, sample_rate=0, hit_rate=0, fulfillment_rate=0, orphan_sample_rate=0, total_revenue=0) # 1. ?見頧???(Sample Rate): (??璅????DIT ?? / (蝮?DIT ?? dits_with_sample = db.query(func.count(func.distinct(MatchResult.dit_id))).filter( MatchResult.target_type == TargetType.SAMPLE, MatchResult.status.in_([MatchStatus.accepted, MatchStatus.auto_matched]) ).scalar() or 0 sample_rate = (dits_with_sample / total_dit * 100) # 2. 閮?賭葉??(Hit Rate): (??閮??DIT ?? / (蝮?DIT ?? dits_with_order = db.query(func.count(func.distinct(MatchResult.dit_id))).filter( MatchResult.target_type == TargetType.ORDER, MatchResult.status.in_([MatchStatus.accepted, MatchStatus.auto_matched]) ).scalar() or 0 hit_rate = (dits_with_order / total_dit * 100) # 3. EAU ????(Fulfillment Rate): (甇詨?銋??桃蜇?? / (DIT ?摯 EAU) attribution_map = get_lifo_attribution(db) total_attributed_qty = sum(item['qty'] for item in attribution_map.values()) total_eau = sum(item['eau'] for item in attribution_map.values()) fulfillment_rate = (total_attributed_qty / total_eau * 100) if total_eau > 0 else 0 # 4. ?⊥??見??(Orphan Sample Rate): (?芸? DIT ?€見?? / (蝮賡€見?? total_samples = db.query(SampleRecord).count() matched_sample_ids = db.query(func.distinct(MatchResult.target_id)).filter( MatchResult.target_type == TargetType.SAMPLE ).all() matched_sample_count = len(matched_sample_ids) orphan_sample_rate = ((total_samples - matched_sample_count) / total_samples * 100) if total_samples > 0 else 0 # 5. 蝮賜??? total_revenue = db.query(func.sum(OrderRecord.amount)).join( MatchResult, MatchResult.target_id == OrderRecord.id ).filter( MatchResult.target_type == TargetType.ORDER, MatchResult.status.in_([MatchStatus.accepted, MatchStatus.auto_matched]) ).scalar() or 0 return KPIResponse( total_dit=total_dit, sample_rate=round(sample_rate, 1), hit_rate=round(hit_rate, 1), fulfillment_rate=round(fulfillment_rate, 1), orphan_sample_rate=round(orphan_sample_rate, 1), total_revenue=total_revenue ) @router.get("/funnel", response_model=List[FunnelItem]) def get_funnel(db: Session = Depends(get_db)): """??瞍??豢?""" total_dit = db.query(DitRecord).count() dits_with_sample = db.query(func.count(func.distinct(MatchResult.dit_id))).filter( MatchResult.target_type == TargetType.SAMPLE, MatchResult.status.in_([MatchStatus.accepted, MatchStatus.auto_matched]) ).scalar() or 0 dits_with_order = db.query(func.count(func.distinct(MatchResult.dit_id))).filter( MatchResult.target_type == TargetType.ORDER, MatchResult.status.in_([MatchStatus.accepted, MatchStatus.auto_matched]) ).scalar() or 0 return [ FunnelItem(name='DIT 獢辣', value=total_dit, fill='#6366f1'), FunnelItem(name='???見', value=dits_with_sample, fill='#8b5cf6'), FunnelItem(name='??閮', value=dits_with_order, fill='#10b981'), ] @router.get("/attribution", response_model=List[AttributionRow]) def get_attribution(db: Session = Depends(get_db)): """??甇詨??敦 (??LIFO ???蕭皞航?閮?""" dit_records = db.query(DitRecord).order_by(DitRecord.date.desc()).all() attribution_map = get_lifo_attribution(db) result = [] for dit in dit_records: # ?曉璅???寥? (???豢?擃?銝€?? sample_match = db.query(MatchResult).filter( MatchResult.dit_id == dit.id, MatchResult.target_type == TargetType.SAMPLE, MatchResult.status.in_([MatchStatus.accepted, MatchStatus.auto_matched]) ).order_by(MatchResult.score.desc()).first() sample_info = None if sample_match: sample = db.query(SampleRecord).filter(SampleRecord.id == sample_match.target_id).first() if sample: sample_info = AttributionSample(order_no=sample.order_no, date=sample.date or '') # ?曉閮?寥? (???豢?擃?銝€?? order_match = db.query(MatchResult).filter( MatchResult.dit_id == dit.id, MatchResult.target_type == TargetType.ORDER, MatchResult.status.in_([MatchStatus.accepted, MatchStatus.auto_matched]) ).order_by(MatchResult.score.desc()).first() order_info = None match_source = None if order_match: order = db.query(OrderRecord).filter(OrderRecord.id == order_match.target_id).first() if order: order_info = AttributionOrder( order_no=order.order_no, status=order.status or 'Unknown', qty=order.qty or 0, amount=order.amount or 0 ) match_source = order_match.match_source attr_data = attribution_map.get(dit.id, {"qty": 0, "eau": dit.eau or 0}) fulfillment = (attr_data['qty'] / attr_data['eau'] * 100) if attr_data['eau'] > 0 else 0 result.append(AttributionRow( dit=AttributionDit( op_id=dit.op_id, customer=dit.customer, pn=dit.pn, eau=dit.eau, stage=dit.stage or '', date=dit.date or '' ), sample=sample_info, order=order_info, match_source=match_source, attributed_qty=attr_data['qty'], fulfillment_rate=round(fulfillment, 1) )) return result