Files
DIT_C/services/dit_analyzer.py
DonaldFang 方士碩 44cd2f8e76 feat: DITAnalyzer module - Feature 6.2 & 6.3 implementation
- DITAnalyzer class with data preprocessing
- Feature 6.2: High value resource allocation analysis
- Feature 6.3: Stagnant deal alerts
- Flask API routes for CSV upload and analysis
- Test suite with sample data
2025-12-12 13:12:31 +08:00

324 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
DIT 智能分析模組
解析 DIT CSV 報表,產出行動建議卡片 (Action Cards)
"""
import pandas as pd
import numpy as np
from datetime import datetime
from typing import List, Dict, Optional, Any
class DITAnalyzer:
"""DIT 報表分析器"""
def __init__(self, file_path: Optional[str] = None, dataframe: Optional[pd.DataFrame] = None):
"""
初始化分析器
Args:
file_path: CSV 檔案路徑
dataframe: 或直接傳入 DataFrame
"""
self.df: Optional[pd.DataFrame] = None
self.processed: bool = False
if file_path:
self.load_data(file_path)
elif dataframe is not None:
self.df = dataframe.copy()
self._preprocess()
def load_data(self, file_path: str) -> 'DITAnalyzer':
"""
載入 CSV 資料
Args:
file_path: CSV 檔案路徑
Returns:
self (支援鏈式呼叫)
"""
try:
self.df = pd.read_csv(file_path, encoding='utf-8')
except UnicodeDecodeError:
self.df = pd.read_csv(file_path, encoding='cp950')
except Exception as e:
raise DITAnalyzerError(f"無法載入檔案: {e}")
self._preprocess()
return self
def _preprocess(self) -> None:
"""執行資料清洗與預處理"""
if self.df is None:
raise DITAnalyzerError("尚未載入資料")
# 1. 欄位清洗:移除欄位名稱前後空白
self.df.columns = self.df.columns.str.strip()
# 2. 日期轉換
date_columns = ['Created Date', 'Approved date', 'Close Date']
for col in date_columns:
if col in self.df.columns:
self.df[col] = pd.to_datetime(self.df[col], errors='coerce')
# 3. 數值轉換Total Price
if 'Total Price' in self.df.columns:
self.df['Total Price'] = pd.to_numeric(
self.df['Total Price'].astype(str).str.replace(',', ''),
errors='coerce'
).fillna(0)
# 4. 應用領域推導 (Derived_Application)
self.df['Derived_Application'] = self._derive_application()
# 5. 狀態標記
if 'Stage' in self.df.columns:
self.df['Is_Lost'] = self.df['Stage'].str.contains(
'Lost', case=False, na=False
)
self.df['Is_Active'] = ~self.df['Is_Lost']
else:
self.df['Is_Lost'] = False
self.df['Is_Active'] = True
self.processed = True
def _derive_application(self) -> pd.Series:
"""
推導應用領域
優先順序: Application → Application Detail → Opportunity Name → "Unknown"
"""
def get_app(row):
# 檢查 Application
if 'Application' in row.index:
val = row.get('Application')
if pd.notna(val) and str(val).strip():
return str(val).strip()
# 檢查 Application Detail
if 'Application Detail' in row.index:
val = row.get('Application Detail')
if pd.notna(val) and str(val).strip():
return str(val).strip()
# 檢查 Opportunity Name
if 'Opportunity Name' in row.index:
val = row.get('Opportunity Name')
if pd.notna(val) and str(val).strip():
return str(val).strip()
return "Unknown"
return self.df.apply(get_app, axis=1)
def analyze_resource_allocation(
self,
top_percent: float = 0.2,
low_win_rate: float = 0.1
) -> List[Dict[str, Any]]:
"""
Feature 6.2: 高價值資源分配建議
找出「金礦區」— 金額大但勝率低的應用領域
Args:
top_percent: 金額排名前 X% (預設 20%)
low_win_rate: 勝率門檻 (預設 10%)
Returns:
Action Cards 列表
"""
if not self.processed:
raise DITAnalyzerError("資料尚未預處理")
# 依 Derived_Application 分組
grouped = self.df.groupby('Derived_Application').agg({
'Total Price': 'sum',
'Is_Active': 'mean',
'Account Name': lambda x: x.value_counts().head(3).index.tolist()
}).reset_index()
grouped.columns = ['Application', 'Sum_Total_Price', 'Win_Rate', 'Top_Accounts']
# 排序並取 Top 20%
grouped = grouped.sort_values('Sum_Total_Price', ascending=False)
top_n = max(1, int(len(grouped) * top_percent))
top_apps = grouped.head(top_n)
# 篩選勝率低於門檻的
low_win_apps = top_apps[top_apps['Win_Rate'] < low_win_rate]
# 產出 Action Cards
action_cards = []
for _, row in low_win_apps.iterrows():
money_formatted = f"${row['Sum_Total_Price']:,.0f}"
win_rate_pct = f"{row['Win_Rate'] * 100:.1f}"
top_accounts = ', '.join(row['Top_Accounts'][:3]) if row['Top_Accounts'] else ''
action_cards.append({
"type": "resource_allocation",
"title": "高潛力市場攻堅提醒",
"application": row['Application'],
"money": money_formatted,
"money_raw": row['Sum_Total_Price'],
"win_rate": win_rate_pct,
"win_rate_raw": row['Win_Rate'],
"top_accounts": row['Top_Accounts'][:3] if row['Top_Accounts'] else [],
"suggestion": (
f"{row['Application']} 領域潛在商機巨大 ({money_formatted})"
f"但目前勝率偏低 ({win_rate_pct}%)。"
f"建議指派資深 FAE 介入該領域的前三大案子 (如 {top_accounts})。"
)
})
return action_cards
def analyze_stagnant_deals(
self,
threshold_days: int = 60,
reference_date: Optional[datetime] = None
) -> List[Dict[str, Any]]:
"""
Feature 6.3: 呆滯案件警示
針對技術已承認但商務卡關的案子進行催單
Args:
threshold_days: 呆滯天數門檻 (預設 60 天)
reference_date: 參考日期 (預設為當前日期)
Returns:
Action Cards 列表
"""
if not self.processed:
raise DITAnalyzerError("資料尚未預處理")
if reference_date is None:
reference_date = datetime.now()
# 檢查必要欄位
if 'Approved date' not in self.df.columns:
return []
# 篩選條件
mask = (
(self.df['Stage'].str.contains('Negotiation', case=False, na=False)) &
(self.df['Approved date'].notna())
)
filtered = self.df[mask].copy()
if filtered.empty:
return []
# 計算呆滯天數
filtered['Days_Since_Approved'] = (
reference_date - filtered['Approved date']
).dt.days
# 篩選超過門檻的
stagnant = filtered[filtered['Days_Since_Approved'] > threshold_days]
# 產出 Action Cards
action_cards = []
for _, row in stagnant.iterrows():
days = int(row['Days_Since_Approved'])
months = days // 30
account = row.get('Account Name', 'Unknown')
project = row.get('Opportunity Name', 'Unknown')
approved_date = row['Approved date'].strftime('%Y-%m-%d') if pd.notna(row['Approved date']) else 'N/A'
action_cards.append({
"type": "stagnant_deal",
"title": "呆滯案件喚醒",
"account": account,
"project": project,
"approved_date": approved_date,
"days_pending": days,
"months_pending": months,
"suggestion": (
f"客戶 {account}{project} 已承認超過 {months} 個月 ({days} 天),仍未轉單。"
f"請業務確認是否為「價格」或「庫存」問題。若無下文,應要求客戶給出 Forecast。"
)
})
# 依天數排序 (最久的在前)
action_cards.sort(key=lambda x: x['days_pending'], reverse=True)
return action_cards
def generate_report(
self,
top_percent: float = 0.2,
low_win_rate: float = 0.1,
threshold_days: int = 60
) -> Dict[str, Any]:
"""
彙整所有分析結果
Args:
top_percent: 高價值分析的金額門檻
low_win_rate: 高價值分析的勝率門檻
threshold_days: 呆滯分析的天數門檻
Returns:
完整分析報告 (Dict)
"""
if not self.processed:
raise DITAnalyzerError("資料尚未預處理")
allocation_suggestions = self.analyze_resource_allocation(top_percent, low_win_rate)
stagnant_alerts = self.analyze_stagnant_deals(threshold_days)
# 統計摘要
summary = self._generate_summary()
return {
"generated_at": datetime.now().isoformat(),
"summary": summary,
"action_cards": {
"resource_allocation": allocation_suggestions,
"stagnant_deals": stagnant_alerts
},
"total_alerts": len(allocation_suggestions) + len(stagnant_alerts)
}
def _generate_summary(self) -> Dict[str, Any]:
"""產生統計摘要"""
total_records = len(self.df)
total_value = self.df['Total Price'].sum()
active_count = self.df['Is_Active'].sum()
lost_count = self.df['Is_Lost'].sum()
# 各階段統計
stage_stats = {}
if 'Stage' in self.df.columns:
stage_stats = self.df['Stage'].value_counts().to_dict()
# 應用領域 Top 5
app_stats = self.df.groupby('Derived_Application')['Total Price'].sum()
top_apps = app_stats.nlargest(5).to_dict()
return {
"total_records": total_records,
"total_value": f"${total_value:,.0f}",
"total_value_raw": total_value,
"active_count": int(active_count),
"lost_count": int(lost_count),
"win_rate": f"{(active_count / total_records * 100):.1f}%" if total_records > 0 else "0%",
"stage_distribution": stage_stats,
"top_applications": {k: f"${v:,.0f}" for k, v in top_apps.items()}
}
def get_dataframe(self) -> pd.DataFrame:
"""取得處理後的 DataFrame"""
if self.df is None:
raise DITAnalyzerError("尚未載入資料")
return self.df.copy()
class DITAnalyzerError(Exception):
"""DIT 分析器錯誤"""
pass