feat: DITAnalyzer module - Feature 6.2 & 6.3 implementation

- DITAnalyzer class with data preprocessing
- Feature 6.2: High value resource allocation analysis
- Feature 6.3: Stagnant deal alerts
- Flask API routes for CSV upload and analysis
- Test suite with sample data
This commit is contained in:
2025-12-12 13:12:31 +08:00
parent 177e8e8fe9
commit 44cd2f8e76
6 changed files with 637 additions and 7 deletions

10
app.py
View File

@@ -15,13 +15,9 @@ def create_app(config_class=Config):
# 初始化擴展 # 初始化擴展
db.init_app(app) db.init_app(app)
# 註冊 Blueprint (後續擴展) # 註冊 Blueprint
# from routes.auth import auth_bp from routes.api import api_bp
# from routes.admin import admin_bp app.register_blueprint(api_bp)
# from routes.api import api_bp
# app.register_blueprint(auth_bp)
# app.register_blueprint(admin_bp)
# app.register_blueprint(api_bp)
# 健康檢查端點 # 健康檢查端點
@app.route('/health') @app.route('/health')

View File

@@ -5,3 +5,6 @@ python-dotenv>=1.0.0
pymysql>=1.1.0 pymysql>=1.1.0
cryptography>=41.0.0 cryptography>=41.0.0
requests>=2.31.0 requests>=2.31.0
pandas>=2.0.0
numpy>=1.24.0
werkzeug>=3.0.0

155
routes/api.py Normal file
View File

@@ -0,0 +1,155 @@
"""
DIT 分析 API 路由
"""
import os
import json
from flask import Blueprint, request, jsonify, current_app
from werkzeug.utils import secure_filename
from services.dit_analyzer import DITAnalyzer, DITAnalyzerError
api_bp = Blueprint('api', __name__, url_prefix='/api')
ALLOWED_EXTENSIONS = {'csv'}
UPLOAD_FOLDER = 'uploads'
def allowed_file(filename: str) -> bool:
"""檢查檔案副檔名"""
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
@api_bp.route('/analyze', methods=['POST'])
def analyze_dit():
"""
分析 DIT CSV 檔案
接受 multipart/form-data 上傳 CSV
回傳 JSON 格式分析結果
"""
# 檢查檔案
if 'file' not in request.files:
return jsonify({"error": "未上傳檔案", "code": "NO_FILE"}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"error": "未選擇檔案", "code": "NO_FILENAME"}), 400
if not allowed_file(file.filename):
return jsonify({"error": "僅支援 CSV 檔案", "code": "INVALID_TYPE"}), 400
# 取得參數
top_percent = float(request.form.get('top_percent', 0.2))
low_win_rate = float(request.form.get('low_win_rate', 0.1))
threshold_days = int(request.form.get('threshold_days', 60))
try:
# 確保上傳目錄存在
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
# 儲存檔案
filename = secure_filename(file.filename)
filepath = os.path.join(UPLOAD_FOLDER, filename)
file.save(filepath)
# 執行分析
analyzer = DITAnalyzer(filepath)
report = analyzer.generate_report(
top_percent=top_percent,
low_win_rate=low_win_rate,
threshold_days=threshold_days
)
# 清理暫存檔
os.remove(filepath)
return jsonify({
"status": "success",
"data": report
})
except DITAnalyzerError as e:
return jsonify({"error": str(e), "code": "ANALYZER_ERROR"}), 400
except Exception as e:
return jsonify({"error": f"分析失敗: {str(e)}", "code": "INTERNAL_ERROR"}), 500
@api_bp.route('/analyze/resource-allocation', methods=['POST'])
def analyze_resource_allocation():
"""
僅執行 Feature 6.2: 高價值資源分配分析
"""
if 'file' not in request.files:
return jsonify({"error": "未上傳檔案", "code": "NO_FILE"}), 400
file = request.files['file']
if not allowed_file(file.filename):
return jsonify({"error": "僅支援 CSV 檔案", "code": "INVALID_TYPE"}), 400
top_percent = float(request.form.get('top_percent', 0.2))
low_win_rate = float(request.form.get('low_win_rate', 0.1))
try:
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
filename = secure_filename(file.filename)
filepath = os.path.join(UPLOAD_FOLDER, filename)
file.save(filepath)
analyzer = DITAnalyzer(filepath)
results = analyzer.analyze_resource_allocation(top_percent, low_win_rate)
os.remove(filepath)
return jsonify({
"status": "success",
"data": {
"type": "resource_allocation",
"count": len(results),
"action_cards": results
}
})
except DITAnalyzerError as e:
return jsonify({"error": str(e), "code": "ANALYZER_ERROR"}), 400
except Exception as e:
return jsonify({"error": f"分析失敗: {str(e)}", "code": "INTERNAL_ERROR"}), 500
@api_bp.route('/analyze/stagnant-deals', methods=['POST'])
def analyze_stagnant_deals():
"""
僅執行 Feature 6.3: 呆滯案件警示
"""
if 'file' not in request.files:
return jsonify({"error": "未上傳檔案", "code": "NO_FILE"}), 400
file = request.files['file']
if not allowed_file(file.filename):
return jsonify({"error": "僅支援 CSV 檔案", "code": "INVALID_TYPE"}), 400
threshold_days = int(request.form.get('threshold_days', 60))
try:
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
filename = secure_filename(file.filename)
filepath = os.path.join(UPLOAD_FOLDER, filename)
file.save(filepath)
analyzer = DITAnalyzer(filepath)
results = analyzer.analyze_stagnant_deals(threshold_days)
os.remove(filepath)
return jsonify({
"status": "success",
"data": {
"type": "stagnant_deals",
"count": len(results),
"action_cards": results
}
})
except DITAnalyzerError as e:
return jsonify({"error": str(e), "code": "ANALYZER_ERROR"}), 400
except Exception as e:
return jsonify({"error": f"分析失敗: {str(e)}", "code": "INTERNAL_ERROR"}), 500

323
services/dit_analyzer.py Normal file
View File

@@ -0,0 +1,323 @@
"""
DIT 智能分析模組
解析 DIT CSV 報表,產出行動建議卡片 (Action Cards)
"""
import pandas as pd
import numpy as np
from datetime import datetime
from typing import List, Dict, Optional, Any
class DITAnalyzer:
"""DIT 報表分析器"""
def __init__(self, file_path: Optional[str] = None, dataframe: Optional[pd.DataFrame] = None):
"""
初始化分析器
Args:
file_path: CSV 檔案路徑
dataframe: 或直接傳入 DataFrame
"""
self.df: Optional[pd.DataFrame] = None
self.processed: bool = False
if file_path:
self.load_data(file_path)
elif dataframe is not None:
self.df = dataframe.copy()
self._preprocess()
def load_data(self, file_path: str) -> 'DITAnalyzer':
"""
載入 CSV 資料
Args:
file_path: CSV 檔案路徑
Returns:
self (支援鏈式呼叫)
"""
try:
self.df = pd.read_csv(file_path, encoding='utf-8')
except UnicodeDecodeError:
self.df = pd.read_csv(file_path, encoding='cp950')
except Exception as e:
raise DITAnalyzerError(f"無法載入檔案: {e}")
self._preprocess()
return self
def _preprocess(self) -> None:
"""執行資料清洗與預處理"""
if self.df is None:
raise DITAnalyzerError("尚未載入資料")
# 1. 欄位清洗:移除欄位名稱前後空白
self.df.columns = self.df.columns.str.strip()
# 2. 日期轉換
date_columns = ['Created Date', 'Approved date', 'Close Date']
for col in date_columns:
if col in self.df.columns:
self.df[col] = pd.to_datetime(self.df[col], errors='coerce')
# 3. 數值轉換Total Price
if 'Total Price' in self.df.columns:
self.df['Total Price'] = pd.to_numeric(
self.df['Total Price'].astype(str).str.replace(',', ''),
errors='coerce'
).fillna(0)
# 4. 應用領域推導 (Derived_Application)
self.df['Derived_Application'] = self._derive_application()
# 5. 狀態標記
if 'Stage' in self.df.columns:
self.df['Is_Lost'] = self.df['Stage'].str.contains(
'Lost', case=False, na=False
)
self.df['Is_Active'] = ~self.df['Is_Lost']
else:
self.df['Is_Lost'] = False
self.df['Is_Active'] = True
self.processed = True
def _derive_application(self) -> pd.Series:
"""
推導應用領域
優先順序: Application → Application Detail → Opportunity Name → "Unknown"
"""
def get_app(row):
# 檢查 Application
if 'Application' in row.index:
val = row.get('Application')
if pd.notna(val) and str(val).strip():
return str(val).strip()
# 檢查 Application Detail
if 'Application Detail' in row.index:
val = row.get('Application Detail')
if pd.notna(val) and str(val).strip():
return str(val).strip()
# 檢查 Opportunity Name
if 'Opportunity Name' in row.index:
val = row.get('Opportunity Name')
if pd.notna(val) and str(val).strip():
return str(val).strip()
return "Unknown"
return self.df.apply(get_app, axis=1)
def analyze_resource_allocation(
self,
top_percent: float = 0.2,
low_win_rate: float = 0.1
) -> List[Dict[str, Any]]:
"""
Feature 6.2: 高價值資源分配建議
找出「金礦區」— 金額大但勝率低的應用領域
Args:
top_percent: 金額排名前 X% (預設 20%)
low_win_rate: 勝率門檻 (預設 10%)
Returns:
Action Cards 列表
"""
if not self.processed:
raise DITAnalyzerError("資料尚未預處理")
# 依 Derived_Application 分組
grouped = self.df.groupby('Derived_Application').agg({
'Total Price': 'sum',
'Is_Active': 'mean',
'Account Name': lambda x: x.value_counts().head(3).index.tolist()
}).reset_index()
grouped.columns = ['Application', 'Sum_Total_Price', 'Win_Rate', 'Top_Accounts']
# 排序並取 Top 20%
grouped = grouped.sort_values('Sum_Total_Price', ascending=False)
top_n = max(1, int(len(grouped) * top_percent))
top_apps = grouped.head(top_n)
# 篩選勝率低於門檻的
low_win_apps = top_apps[top_apps['Win_Rate'] < low_win_rate]
# 產出 Action Cards
action_cards = []
for _, row in low_win_apps.iterrows():
money_formatted = f"${row['Sum_Total_Price']:,.0f}"
win_rate_pct = f"{row['Win_Rate'] * 100:.1f}"
top_accounts = ', '.join(row['Top_Accounts'][:3]) if row['Top_Accounts'] else ''
action_cards.append({
"type": "resource_allocation",
"title": "高潛力市場攻堅提醒",
"application": row['Application'],
"money": money_formatted,
"money_raw": row['Sum_Total_Price'],
"win_rate": win_rate_pct,
"win_rate_raw": row['Win_Rate'],
"top_accounts": row['Top_Accounts'][:3] if row['Top_Accounts'] else [],
"suggestion": (
f"{row['Application']} 領域潛在商機巨大 ({money_formatted})"
f"但目前勝率偏低 ({win_rate_pct}%)。"
f"建議指派資深 FAE 介入該領域的前三大案子 (如 {top_accounts})。"
)
})
return action_cards
def analyze_stagnant_deals(
self,
threshold_days: int = 60,
reference_date: Optional[datetime] = None
) -> List[Dict[str, Any]]:
"""
Feature 6.3: 呆滯案件警示
針對技術已承認但商務卡關的案子進行催單
Args:
threshold_days: 呆滯天數門檻 (預設 60 天)
reference_date: 參考日期 (預設為當前日期)
Returns:
Action Cards 列表
"""
if not self.processed:
raise DITAnalyzerError("資料尚未預處理")
if reference_date is None:
reference_date = datetime.now()
# 檢查必要欄位
if 'Approved date' not in self.df.columns:
return []
# 篩選條件
mask = (
(self.df['Stage'].str.contains('Negotiation', case=False, na=False)) &
(self.df['Approved date'].notna())
)
filtered = self.df[mask].copy()
if filtered.empty:
return []
# 計算呆滯天數
filtered['Days_Since_Approved'] = (
reference_date - filtered['Approved date']
).dt.days
# 篩選超過門檻的
stagnant = filtered[filtered['Days_Since_Approved'] > threshold_days]
# 產出 Action Cards
action_cards = []
for _, row in stagnant.iterrows():
days = int(row['Days_Since_Approved'])
months = days // 30
account = row.get('Account Name', 'Unknown')
project = row.get('Opportunity Name', 'Unknown')
approved_date = row['Approved date'].strftime('%Y-%m-%d') if pd.notna(row['Approved date']) else 'N/A'
action_cards.append({
"type": "stagnant_deal",
"title": "呆滯案件喚醒",
"account": account,
"project": project,
"approved_date": approved_date,
"days_pending": days,
"months_pending": months,
"suggestion": (
f"客戶 {account}{project} 已承認超過 {months} 個月 ({days} 天),仍未轉單。"
f"請業務確認是否為「價格」或「庫存」問題。若無下文,應要求客戶給出 Forecast。"
)
})
# 依天數排序 (最久的在前)
action_cards.sort(key=lambda x: x['days_pending'], reverse=True)
return action_cards
def generate_report(
self,
top_percent: float = 0.2,
low_win_rate: float = 0.1,
threshold_days: int = 60
) -> Dict[str, Any]:
"""
彙整所有分析結果
Args:
top_percent: 高價值分析的金額門檻
low_win_rate: 高價值分析的勝率門檻
threshold_days: 呆滯分析的天數門檻
Returns:
完整分析報告 (Dict)
"""
if not self.processed:
raise DITAnalyzerError("資料尚未預處理")
allocation_suggestions = self.analyze_resource_allocation(top_percent, low_win_rate)
stagnant_alerts = self.analyze_stagnant_deals(threshold_days)
# 統計摘要
summary = self._generate_summary()
return {
"generated_at": datetime.now().isoformat(),
"summary": summary,
"action_cards": {
"resource_allocation": allocation_suggestions,
"stagnant_deals": stagnant_alerts
},
"total_alerts": len(allocation_suggestions) + len(stagnant_alerts)
}
def _generate_summary(self) -> Dict[str, Any]:
"""產生統計摘要"""
total_records = len(self.df)
total_value = self.df['Total Price'].sum()
active_count = self.df['Is_Active'].sum()
lost_count = self.df['Is_Lost'].sum()
# 各階段統計
stage_stats = {}
if 'Stage' in self.df.columns:
stage_stats = self.df['Stage'].value_counts().to_dict()
# 應用領域 Top 5
app_stats = self.df.groupby('Derived_Application')['Total Price'].sum()
top_apps = app_stats.nlargest(5).to_dict()
return {
"total_records": total_records,
"total_value": f"${total_value:,.0f}",
"total_value_raw": total_value,
"active_count": int(active_count),
"lost_count": int(lost_count),
"win_rate": f"{(active_count / total_records * 100):.1f}%" if total_records > 0 else "0%",
"stage_distribution": stage_stats,
"top_applications": {k: f"${v:,.0f}" for k, v in top_apps.items()}
}
def get_dataframe(self) -> pd.DataFrame:
"""取得處理後的 DataFrame"""
if self.df is None:
raise DITAnalyzerError("尚未載入資料")
return self.df.copy()
class DITAnalyzerError(Exception):
"""DIT 分析器錯誤"""
pass

0
tests/__init__.py Normal file
View File

153
tests/test_dit_analyzer.py Normal file
View File

@@ -0,0 +1,153 @@
"""
DITAnalyzer Test Script
"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import pandas as pd
from datetime import datetime, timedelta
from services.dit_analyzer import DITAnalyzer, DITAnalyzerError
def create_sample_data():
"""Create sample test data"""
days_ago_90 = datetime.now() - timedelta(days=90)
days_ago_30 = datetime.now() - timedelta(days=30)
data = {
'Created Date': ['2024-01-01', '2024-02-01', '2024-03-01', '2024-04-01', '2024-05-01',
'2024-01-15', '2024-02-15', '2024-03-15', '2024-04-15', '2024-05-15'],
'Account Name': ['CustomerA', 'CustomerB', 'CustomerA', 'CustomerC', 'CustomerD',
'CustomerE', 'CustomerF', 'CustomerA', 'CustomerG', 'CustomerH'],
'Stage': ['Won', 'Opportunity Lost', 'Negotiation', 'Won', 'Design-Lost',
'Negotiation', 'Mass Production', 'Opportunity Lost', 'Negotiation', 'Won'],
'Application': ['Automotive', '', 'Automotive', 'IoT', '',
'Automotive', 'Consumer', '', 'Industrial', 'Automotive'],
'Application Detail': ['', 'Consumer Electronics', '', '', 'Smart Home',
'', '', 'Power Supply', '', ''],
'Opportunity Name': ['Project Alpha', 'Project Beta', 'Project Gamma', 'Project Delta', 'Project Epsilon',
'Project Zeta', 'Project Eta', 'Project Theta', 'Project Iota', 'Project Kappa'],
'Total Price': [500000, 300000, 800000, 150000, 200000,
1200000, 50000, 100000, 450000, 600000],
'Approved date': [None, None, days_ago_90.strftime('%Y-%m-%d'), None, None,
days_ago_90.strftime('%Y-%m-%d'), None, None, days_ago_30.strftime('%Y-%m-%d'), None],
'Lost Type': ['', 'Price', '', '', 'Spec',
'', '', 'Price', '', '']
}
return pd.DataFrame(data)
def test_preprocess():
"""Test data preprocessing"""
print("=" * 50)
print("Test 1: Data Preprocessing")
print("=" * 50)
df = create_sample_data()
analyzer = DITAnalyzer(dataframe=df)
processed_df = analyzer.get_dataframe()
print(f"Total records: {len(processed_df)}")
print(f"Columns: {list(processed_df.columns)}")
assert 'Derived_Application' in processed_df.columns
assert 'Is_Lost' in processed_df.columns
assert 'Is_Active' in processed_df.columns
print("\n[PASS] Preprocess test passed!")
def test_resource_allocation():
"""Test Feature 6.2: High Value Resource Allocation"""
print("\n" + "=" * 50)
print("Test 2: Feature 6.2 Resource Allocation")
print("=" * 50)
df = create_sample_data()
analyzer = DITAnalyzer(dataframe=df)
results = analyzer.analyze_resource_allocation(top_percent=0.5, low_win_rate=0.5)
print(f"Found {len(results)} high-value low-win-rate applications")
for card in results:
print(f"\n[CARD] {card['title']}")
print(f" Application: {card['application']}")
print(f" Potential Value: {card['money']}")
print(f" Win Rate: {card['win_rate']}%")
print("\n[PASS] Resource allocation test passed!")
def test_stagnant_deals():
"""Test Feature 6.3: Stagnant Deal Alert"""
print("\n" + "=" * 50)
print("Test 3: Feature 6.3 Stagnant Deals")
print("=" * 50)
df = create_sample_data()
analyzer = DITAnalyzer(dataframe=df)
results = analyzer.analyze_stagnant_deals(threshold_days=60)
print(f"Found {len(results)} stagnant deals")
for card in results:
print(f"\n[ALERT] {card['title']}")
print(f" Account: {card['account']}")
print(f" Project: {card['project']}")
print(f" Days Pending: {card['days_pending']}")
print("\n[PASS] Stagnant deals test passed!")
def test_full_report():
"""Test full report generation"""
print("\n" + "=" * 50)
print("Test 4: Full Report Generation")
print("=" * 50)
df = create_sample_data()
analyzer = DITAnalyzer(dataframe=df)
report = analyzer.generate_report(top_percent=0.5, low_win_rate=0.5, threshold_days=60)
print(f"\n[REPORT] Generated at: {report['generated_at']}")
summary = report['summary']
print(f" Total Records: {summary['total_records']}")
print(f" Total Value: {summary['total_value']}")
print(f" Win Rate: {summary['win_rate']}")
print(f"\n[ACTION CARDS] Total: {report['total_alerts']}")
print(f" - Resource Allocation: {len(report['action_cards']['resource_allocation'])}")
print(f" - Stagnant Deals: {len(report['action_cards']['stagnant_deals'])}")
print("\n[PASS] Full report test passed!")
def main():
"""Run all tests"""
print("\n[START] DITAnalyzer Test Suite\n")
try:
test_preprocess()
test_resource_allocation()
test_stagnant_deals()
test_full_report()
print("\n" + "=" * 50)
print("[SUCCESS] All tests passed!")
print("=" * 50)
except Exception as e:
print(f"\n[FAIL] Test failed: {e}")
import traceback
traceback.print_exc()
return 1
return 0
if __name__ == '__main__':
exit(main())