Files
SalesPipeline/temp_dashboard.py
2026-01-16 18:16:33 +08:00

221 lines
17 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from typing import List
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from sqlalchemy import func
from pydantic import BaseModel
from app.models import get_db
from app.models.dit import DitRecord
from app.models.sample import SampleRecord
from app.models.order import OrderRecord
from app.models.match import MatchResult, MatchStatus, TargetType
router = APIRouter(prefix="/dashboard", tags=["Dashboard"])
class KPIResponse(BaseModel):
total_dit: int
sample_rate: float # ?見頧??? hit_rate: float # 閮?賭葉?? fulfillment_rate: float # EAU ???? orphan_sample_rate: float # ?⊥??見?? total_revenue: float
class FunnelItem(BaseModel):
name: str
value: int
fill: str
class AttributionDit(BaseModel):
op_id: str
customer: str
pn: str
eau: int
stage: str
date: str
class AttributionSample(BaseModel):
order_no: str
date: str
class AttributionOrder(BaseModel):
order_no: str
status: str
qty: int
amount: float
class AttributionRow(BaseModel):
dit: AttributionDit
sample: AttributionSample | None
order: AttributionOrder | None
match_source: str | None
attributed_qty: int
fulfillment_rate: float
def get_lifo_attribution(db: Session):
"""?瑁? LIFO 璆剔蜀???摩"""
# 1. ???€??DIT嚗??交??望?啗??? (LIFO)
dits = db.query(DitRecord).order_by(DitRecord.date.desc()).all()
# 2. ???€?歇?寥?銝??閮
matched_orders = db.query(MatchResult, OrderRecord).join(
OrderRecord, MatchResult.target_id == OrderRecord.id
).filter(
MatchResult.target_type == TargetType.ORDER,
MatchResult.status.in_([MatchStatus.accepted, MatchStatus.auto_matched])
).all()
# 3. 撱箇?璆剔蜀瘙?(Revenue Pool) - ??(摰X, ??) ??
order_pools = {}
for match, order in matched_orders:
key = (order.customer_normalized, order.pn)
if key not in order_pools:
order_pools[key] = 0
order_pools[key] += (order.qty or 0)
# 4. ?脰???
attribution_map = {} # dit_id -> {qty, total_eau}
for dit in dits:
key = (dit.customer_normalized, dit.pn)
eau = dit.eau or 0
allocated = 0
if key in order_pools and order_pools[key] > 0:
allocated = min(eau, order_pools[key])
order_pools[key] -= allocated
attribution_map[dit.id] = {
"qty": allocated,
"eau": eau
}
return attribution_map
@router.get("/kpi", response_model=KPIResponse)
def get_kpi(db: Session = Depends(get_db)):
"""?? KPI 蝯梯? (蝚血?閬??v1.0)"""
total_dit = db.query(DitRecord).count()
if total_dit == 0:
return KPIResponse(total_dit=0, sample_rate=0, hit_rate=0, fulfillment_rate=0, orphan_sample_rate=0, total_revenue=0)
# 1. ?見頧???(Sample Rate): (??璅????DIT ?? / (蝮?DIT ??
dits_with_sample = db.query(func.count(func.distinct(MatchResult.dit_id))).filter(
MatchResult.target_type == TargetType.SAMPLE,
MatchResult.status.in_([MatchStatus.accepted, MatchStatus.auto_matched])
).scalar() or 0
sample_rate = (dits_with_sample / total_dit * 100)
# 2. 閮?賭葉??(Hit Rate): (??閮??DIT ?? / (蝮?DIT ??
dits_with_order = db.query(func.count(func.distinct(MatchResult.dit_id))).filter(
MatchResult.target_type == TargetType.ORDER,
MatchResult.status.in_([MatchStatus.accepted, MatchStatus.auto_matched])
).scalar() or 0
hit_rate = (dits_with_order / total_dit * 100)
# 3. EAU ????(Fulfillment Rate): (甇詨?銋??桃蜇?? / (DIT ?摯 EAU)
attribution_map = get_lifo_attribution(db)
total_attributed_qty = sum(item['qty'] for item in attribution_map.values())
total_eau = sum(item['eau'] for item in attribution_map.values())
fulfillment_rate = (total_attributed_qty / total_eau * 100) if total_eau > 0 else 0
# 4. ?⊥??見??(Orphan Sample Rate): (?芸? DIT ?€見?? / (蝮賡€見??
total_samples = db.query(SampleRecord).count()
matched_sample_ids = db.query(func.distinct(MatchResult.target_id)).filter(
MatchResult.target_type == TargetType.SAMPLE
).all()
matched_sample_count = len(matched_sample_ids)
orphan_sample_rate = ((total_samples - matched_sample_count) / total_samples * 100) if total_samples > 0 else 0
# 5. 蝮賜??? total_revenue = db.query(func.sum(OrderRecord.amount)).join(
MatchResult, MatchResult.target_id == OrderRecord.id
).filter(
MatchResult.target_type == TargetType.ORDER,
MatchResult.status.in_([MatchStatus.accepted, MatchStatus.auto_matched])
).scalar() or 0
return KPIResponse(
total_dit=total_dit,
sample_rate=round(sample_rate, 1),
hit_rate=round(hit_rate, 1),
fulfillment_rate=round(fulfillment_rate, 1),
orphan_sample_rate=round(orphan_sample_rate, 1),
total_revenue=total_revenue
)
@router.get("/funnel", response_model=List[FunnelItem])
def get_funnel(db: Session = Depends(get_db)):
"""??瞍??豢?"""
total_dit = db.query(DitRecord).count()
dits_with_sample = db.query(func.count(func.distinct(MatchResult.dit_id))).filter(
MatchResult.target_type == TargetType.SAMPLE,
MatchResult.status.in_([MatchStatus.accepted, MatchStatus.auto_matched])
).scalar() or 0
dits_with_order = db.query(func.count(func.distinct(MatchResult.dit_id))).filter(
MatchResult.target_type == TargetType.ORDER,
MatchResult.status.in_([MatchStatus.accepted, MatchStatus.auto_matched])
).scalar() or 0
return [
FunnelItem(name='DIT 獢辣', value=total_dit, fill='#6366f1'),
FunnelItem(name='???見', value=dits_with_sample, fill='#8b5cf6'),
FunnelItem(name='??閮', value=dits_with_order, fill='#10b981'),
]
@router.get("/attribution", response_model=List[AttributionRow])
def get_attribution(db: Session = Depends(get_db)):
"""??甇詨??敦 (??LIFO ???蕭皞航?閮?"""
dit_records = db.query(DitRecord).order_by(DitRecord.date.desc()).all()
attribution_map = get_lifo_attribution(db)
result = []
for dit in dit_records:
# ?曉璅???寥? (???豢?擃?銝€??
sample_match = db.query(MatchResult).filter(
MatchResult.dit_id == dit.id,
MatchResult.target_type == TargetType.SAMPLE,
MatchResult.status.in_([MatchStatus.accepted, MatchStatus.auto_matched])
).order_by(MatchResult.score.desc()).first()
sample_info = None
if sample_match:
sample = db.query(SampleRecord).filter(SampleRecord.id == sample_match.target_id).first()
if sample:
sample_info = AttributionSample(order_no=sample.order_no, date=sample.date or '')
# ?曉閮?寥? (???豢?擃?銝€??
order_match = db.query(MatchResult).filter(
MatchResult.dit_id == dit.id,
MatchResult.target_type == TargetType.ORDER,
MatchResult.status.in_([MatchStatus.accepted, MatchStatus.auto_matched])
).order_by(MatchResult.score.desc()).first()
order_info = None
match_source = None
if order_match:
order = db.query(OrderRecord).filter(OrderRecord.id == order_match.target_id).first()
if order:
order_info = AttributionOrder(
order_no=order.order_no,
status=order.status or 'Unknown',
qty=order.qty or 0,
amount=order.amount or 0
)
match_source = order_match.match_source
attr_data = attribution_map.get(dit.id, {"qty": 0, "eau": dit.eau or 0})
fulfillment = (attr_data['qty'] / attr_data['eau'] * 100) if attr_data['eau'] > 0 else 0
result.append(AttributionRow(
dit=AttributionDit(
op_id=dit.op_id,
customer=dit.customer,
pn=dit.pn,
eau=dit.eau,
stage=dit.stage or '',
date=dit.date or ''
),
sample=sample_info,
order=order_info,
match_source=match_source,
attributed_qty=attr_data['qty'],
fulfillment_rate=round(fulfillment, 1)
))
return result