From 44cd2f8e76707fb0b49f15c4817b52d63f5e62d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?DonaldFang=20=E6=96=B9=E5=A3=AB=E7=A2=A9?= Date: Fri, 12 Dec 2025 13:12:31 +0800 Subject: [PATCH] feat: DITAnalyzer module - Feature 6.2 & 6.3 implementation - DITAnalyzer class with data preprocessing - Feature 6.2: High value resource allocation analysis - Feature 6.3: Stagnant deal alerts - Flask API routes for CSV upload and analysis - Test suite with sample data --- app.py | 10 +- requirements.txt | 3 + routes/api.py | 155 ++++++++++++++++++ services/dit_analyzer.py | 323 +++++++++++++++++++++++++++++++++++++ tests/__init__.py | 0 tests/test_dit_analyzer.py | 153 ++++++++++++++++++ 6 files changed, 637 insertions(+), 7 deletions(-) create mode 100644 routes/api.py create mode 100644 services/dit_analyzer.py create mode 100644 tests/__init__.py create mode 100644 tests/test_dit_analyzer.py diff --git a/app.py b/app.py index 593bc27..80fdff3 100644 --- a/app.py +++ b/app.py @@ -15,13 +15,9 @@ def create_app(config_class=Config): # 初始化擴展 db.init_app(app) - # 註冊 Blueprint (後續擴展) - # from routes.auth import auth_bp - # from routes.admin import admin_bp - # from routes.api import api_bp - # app.register_blueprint(auth_bp) - # app.register_blueprint(admin_bp) - # app.register_blueprint(api_bp) + # 註冊 Blueprint + from routes.api import api_bp + app.register_blueprint(api_bp) # 健康檢查端點 @app.route('/health') diff --git a/requirements.txt b/requirements.txt index 633b3ee..a5ad39e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,3 +5,6 @@ python-dotenv>=1.0.0 pymysql>=1.1.0 cryptography>=41.0.0 requests>=2.31.0 +pandas>=2.0.0 +numpy>=1.24.0 +werkzeug>=3.0.0 diff --git a/routes/api.py b/routes/api.py new file mode 100644 index 0000000..ce238dc --- /dev/null +++ b/routes/api.py @@ -0,0 +1,155 @@ +""" +DIT 分析 API 路由 +""" + +import os +import json +from flask import Blueprint, request, jsonify, current_app +from werkzeug.utils import secure_filename +from services.dit_analyzer import DITAnalyzer, DITAnalyzerError + +api_bp = Blueprint('api', __name__, url_prefix='/api') + +ALLOWED_EXTENSIONS = {'csv'} +UPLOAD_FOLDER = 'uploads' + + +def allowed_file(filename: str) -> bool: + """檢查檔案副檔名""" + return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS + + +@api_bp.route('/analyze', methods=['POST']) +def analyze_dit(): + """ + 分析 DIT CSV 檔案 + + 接受 multipart/form-data 上傳 CSV + 回傳 JSON 格式分析結果 + """ + # 檢查檔案 + if 'file' not in request.files: + return jsonify({"error": "未上傳檔案", "code": "NO_FILE"}), 400 + + file = request.files['file'] + if file.filename == '': + return jsonify({"error": "未選擇檔案", "code": "NO_FILENAME"}), 400 + + if not allowed_file(file.filename): + return jsonify({"error": "僅支援 CSV 檔案", "code": "INVALID_TYPE"}), 400 + + # 取得參數 + top_percent = float(request.form.get('top_percent', 0.2)) + low_win_rate = float(request.form.get('low_win_rate', 0.1)) + threshold_days = int(request.form.get('threshold_days', 60)) + + try: + # 確保上傳目錄存在 + os.makedirs(UPLOAD_FOLDER, exist_ok=True) + + # 儲存檔案 + filename = secure_filename(file.filename) + filepath = os.path.join(UPLOAD_FOLDER, filename) + file.save(filepath) + + # 執行分析 + analyzer = DITAnalyzer(filepath) + report = analyzer.generate_report( + top_percent=top_percent, + low_win_rate=low_win_rate, + threshold_days=threshold_days + ) + + # 清理暫存檔 + os.remove(filepath) + + return jsonify({ + "status": "success", + "data": report + }) + + except DITAnalyzerError as e: + return jsonify({"error": str(e), "code": "ANALYZER_ERROR"}), 400 + except Exception as e: + return jsonify({"error": f"分析失敗: {str(e)}", "code": "INTERNAL_ERROR"}), 500 + + +@api_bp.route('/analyze/resource-allocation', methods=['POST']) +def analyze_resource_allocation(): + """ + 僅執行 Feature 6.2: 高價值資源分配分析 + """ + if 'file' not in request.files: + return jsonify({"error": "未上傳檔案", "code": "NO_FILE"}), 400 + + file = request.files['file'] + if not allowed_file(file.filename): + return jsonify({"error": "僅支援 CSV 檔案", "code": "INVALID_TYPE"}), 400 + + top_percent = float(request.form.get('top_percent', 0.2)) + low_win_rate = float(request.form.get('low_win_rate', 0.1)) + + try: + os.makedirs(UPLOAD_FOLDER, exist_ok=True) + filename = secure_filename(file.filename) + filepath = os.path.join(UPLOAD_FOLDER, filename) + file.save(filepath) + + analyzer = DITAnalyzer(filepath) + results = analyzer.analyze_resource_allocation(top_percent, low_win_rate) + + os.remove(filepath) + + return jsonify({ + "status": "success", + "data": { + "type": "resource_allocation", + "count": len(results), + "action_cards": results + } + }) + + except DITAnalyzerError as e: + return jsonify({"error": str(e), "code": "ANALYZER_ERROR"}), 400 + except Exception as e: + return jsonify({"error": f"分析失敗: {str(e)}", "code": "INTERNAL_ERROR"}), 500 + + +@api_bp.route('/analyze/stagnant-deals', methods=['POST']) +def analyze_stagnant_deals(): + """ + 僅執行 Feature 6.3: 呆滯案件警示 + """ + if 'file' not in request.files: + return jsonify({"error": "未上傳檔案", "code": "NO_FILE"}), 400 + + file = request.files['file'] + if not allowed_file(file.filename): + return jsonify({"error": "僅支援 CSV 檔案", "code": "INVALID_TYPE"}), 400 + + threshold_days = int(request.form.get('threshold_days', 60)) + + try: + os.makedirs(UPLOAD_FOLDER, exist_ok=True) + filename = secure_filename(file.filename) + filepath = os.path.join(UPLOAD_FOLDER, filename) + file.save(filepath) + + analyzer = DITAnalyzer(filepath) + results = analyzer.analyze_stagnant_deals(threshold_days) + + os.remove(filepath) + + return jsonify({ + "status": "success", + "data": { + "type": "stagnant_deals", + "count": len(results), + "action_cards": results + } + }) + + except DITAnalyzerError as e: + return jsonify({"error": str(e), "code": "ANALYZER_ERROR"}), 400 + except Exception as e: + return jsonify({"error": f"分析失敗: {str(e)}", "code": "INTERNAL_ERROR"}), 500 diff --git a/services/dit_analyzer.py b/services/dit_analyzer.py new file mode 100644 index 0000000..d4029d8 --- /dev/null +++ b/services/dit_analyzer.py @@ -0,0 +1,323 @@ +""" +DIT 智能分析模組 +解析 DIT CSV 報表,產出行動建議卡片 (Action Cards) +""" + +import pandas as pd +import numpy as np +from datetime import datetime +from typing import List, Dict, Optional, Any + + +class DITAnalyzer: + """DIT 報表分析器""" + + def __init__(self, file_path: Optional[str] = None, dataframe: Optional[pd.DataFrame] = None): + """ + 初始化分析器 + + Args: + file_path: CSV 檔案路徑 + dataframe: 或直接傳入 DataFrame + """ + self.df: Optional[pd.DataFrame] = None + self.processed: bool = False + + if file_path: + self.load_data(file_path) + elif dataframe is not None: + self.df = dataframe.copy() + self._preprocess() + + def load_data(self, file_path: str) -> 'DITAnalyzer': + """ + 載入 CSV 資料 + + Args: + file_path: CSV 檔案路徑 + + Returns: + self (支援鏈式呼叫) + """ + try: + self.df = pd.read_csv(file_path, encoding='utf-8') + except UnicodeDecodeError: + self.df = pd.read_csv(file_path, encoding='cp950') + except Exception as e: + raise DITAnalyzerError(f"無法載入檔案: {e}") + + self._preprocess() + return self + + def _preprocess(self) -> None: + """執行資料清洗與預處理""" + if self.df is None: + raise DITAnalyzerError("尚未載入資料") + + # 1. 欄位清洗:移除欄位名稱前後空白 + self.df.columns = self.df.columns.str.strip() + + # 2. 日期轉換 + date_columns = ['Created Date', 'Approved date', 'Close Date'] + for col in date_columns: + if col in self.df.columns: + self.df[col] = pd.to_datetime(self.df[col], errors='coerce') + + # 3. 數值轉換:Total Price + if 'Total Price' in self.df.columns: + self.df['Total Price'] = pd.to_numeric( + self.df['Total Price'].astype(str).str.replace(',', ''), + errors='coerce' + ).fillna(0) + + # 4. 應用領域推導 (Derived_Application) + self.df['Derived_Application'] = self._derive_application() + + # 5. 狀態標記 + if 'Stage' in self.df.columns: + self.df['Is_Lost'] = self.df['Stage'].str.contains( + 'Lost', case=False, na=False + ) + self.df['Is_Active'] = ~self.df['Is_Lost'] + else: + self.df['Is_Lost'] = False + self.df['Is_Active'] = True + + self.processed = True + + def _derive_application(self) -> pd.Series: + """ + 推導應用領域 + 優先順序: Application → Application Detail → Opportunity Name → "Unknown" + """ + def get_app(row): + # 檢查 Application + if 'Application' in row.index: + val = row.get('Application') + if pd.notna(val) and str(val).strip(): + return str(val).strip() + + # 檢查 Application Detail + if 'Application Detail' in row.index: + val = row.get('Application Detail') + if pd.notna(val) and str(val).strip(): + return str(val).strip() + + # 檢查 Opportunity Name + if 'Opportunity Name' in row.index: + val = row.get('Opportunity Name') + if pd.notna(val) and str(val).strip(): + return str(val).strip() + + return "Unknown" + + return self.df.apply(get_app, axis=1) + + def analyze_resource_allocation( + self, + top_percent: float = 0.2, + low_win_rate: float = 0.1 + ) -> List[Dict[str, Any]]: + """ + Feature 6.2: 高價值資源分配建議 + + 找出「金礦區」— 金額大但勝率低的應用領域 + + Args: + top_percent: 金額排名前 X% (預設 20%) + low_win_rate: 勝率門檻 (預設 10%) + + Returns: + Action Cards 列表 + """ + if not self.processed: + raise DITAnalyzerError("資料尚未預處理") + + # 依 Derived_Application 分組 + grouped = self.df.groupby('Derived_Application').agg({ + 'Total Price': 'sum', + 'Is_Active': 'mean', + 'Account Name': lambda x: x.value_counts().head(3).index.tolist() + }).reset_index() + + grouped.columns = ['Application', 'Sum_Total_Price', 'Win_Rate', 'Top_Accounts'] + + # 排序並取 Top 20% + grouped = grouped.sort_values('Sum_Total_Price', ascending=False) + top_n = max(1, int(len(grouped) * top_percent)) + top_apps = grouped.head(top_n) + + # 篩選勝率低於門檻的 + low_win_apps = top_apps[top_apps['Win_Rate'] < low_win_rate] + + # 產出 Action Cards + action_cards = [] + for _, row in low_win_apps.iterrows(): + money_formatted = f"${row['Sum_Total_Price']:,.0f}" + win_rate_pct = f"{row['Win_Rate'] * 100:.1f}" + top_accounts = ', '.join(row['Top_Accounts'][:3]) if row['Top_Accounts'] else '無' + + action_cards.append({ + "type": "resource_allocation", + "title": "高潛力市場攻堅提醒", + "application": row['Application'], + "money": money_formatted, + "money_raw": row['Sum_Total_Price'], + "win_rate": win_rate_pct, + "win_rate_raw": row['Win_Rate'], + "top_accounts": row['Top_Accounts'][:3] if row['Top_Accounts'] else [], + "suggestion": ( + f"{row['Application']} 領域潛在商機巨大 ({money_formatted})," + f"但目前勝率偏低 ({win_rate_pct}%)。" + f"建議指派資深 FAE 介入該領域的前三大案子 (如 {top_accounts})。" + ) + }) + + return action_cards + + def analyze_stagnant_deals( + self, + threshold_days: int = 60, + reference_date: Optional[datetime] = None + ) -> List[Dict[str, Any]]: + """ + Feature 6.3: 呆滯案件警示 + + 針對技術已承認但商務卡關的案子進行催單 + + Args: + threshold_days: 呆滯天數門檻 (預設 60 天) + reference_date: 參考日期 (預設為當前日期) + + Returns: + Action Cards 列表 + """ + if not self.processed: + raise DITAnalyzerError("資料尚未預處理") + + if reference_date is None: + reference_date = datetime.now() + + # 檢查必要欄位 + if 'Approved date' not in self.df.columns: + return [] + + # 篩選條件 + mask = ( + (self.df['Stage'].str.contains('Negotiation', case=False, na=False)) & + (self.df['Approved date'].notna()) + ) + filtered = self.df[mask].copy() + + if filtered.empty: + return [] + + # 計算呆滯天數 + filtered['Days_Since_Approved'] = ( + reference_date - filtered['Approved date'] + ).dt.days + + # 篩選超過門檻的 + stagnant = filtered[filtered['Days_Since_Approved'] > threshold_days] + + # 產出 Action Cards + action_cards = [] + for _, row in stagnant.iterrows(): + days = int(row['Days_Since_Approved']) + months = days // 30 + account = row.get('Account Name', 'Unknown') + project = row.get('Opportunity Name', 'Unknown') + approved_date = row['Approved date'].strftime('%Y-%m-%d') if pd.notna(row['Approved date']) else 'N/A' + + action_cards.append({ + "type": "stagnant_deal", + "title": "呆滯案件喚醒", + "account": account, + "project": project, + "approved_date": approved_date, + "days_pending": days, + "months_pending": months, + "suggestion": ( + f"客戶 {account} 的 {project} 已承認超過 {months} 個月 ({days} 天),仍未轉單。" + f"請業務確認是否為「價格」或「庫存」問題。若無下文,應要求客戶給出 Forecast。" + ) + }) + + # 依天數排序 (最久的在前) + action_cards.sort(key=lambda x: x['days_pending'], reverse=True) + + return action_cards + + def generate_report( + self, + top_percent: float = 0.2, + low_win_rate: float = 0.1, + threshold_days: int = 60 + ) -> Dict[str, Any]: + """ + 彙整所有分析結果 + + Args: + top_percent: 高價值分析的金額門檻 + low_win_rate: 高價值分析的勝率門檻 + threshold_days: 呆滯分析的天數門檻 + + Returns: + 完整分析報告 (Dict) + """ + if not self.processed: + raise DITAnalyzerError("資料尚未預處理") + + allocation_suggestions = self.analyze_resource_allocation(top_percent, low_win_rate) + stagnant_alerts = self.analyze_stagnant_deals(threshold_days) + + # 統計摘要 + summary = self._generate_summary() + + return { + "generated_at": datetime.now().isoformat(), + "summary": summary, + "action_cards": { + "resource_allocation": allocation_suggestions, + "stagnant_deals": stagnant_alerts + }, + "total_alerts": len(allocation_suggestions) + len(stagnant_alerts) + } + + def _generate_summary(self) -> Dict[str, Any]: + """產生統計摘要""" + total_records = len(self.df) + total_value = self.df['Total Price'].sum() + active_count = self.df['Is_Active'].sum() + lost_count = self.df['Is_Lost'].sum() + + # 各階段統計 + stage_stats = {} + if 'Stage' in self.df.columns: + stage_stats = self.df['Stage'].value_counts().to_dict() + + # 應用領域 Top 5 + app_stats = self.df.groupby('Derived_Application')['Total Price'].sum() + top_apps = app_stats.nlargest(5).to_dict() + + return { + "total_records": total_records, + "total_value": f"${total_value:,.0f}", + "total_value_raw": total_value, + "active_count": int(active_count), + "lost_count": int(lost_count), + "win_rate": f"{(active_count / total_records * 100):.1f}%" if total_records > 0 else "0%", + "stage_distribution": stage_stats, + "top_applications": {k: f"${v:,.0f}" for k, v in top_apps.items()} + } + + def get_dataframe(self) -> pd.DataFrame: + """取得處理後的 DataFrame""" + if self.df is None: + raise DITAnalyzerError("尚未載入資料") + return self.df.copy() + + +class DITAnalyzerError(Exception): + """DIT 分析器錯誤""" + pass diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_dit_analyzer.py b/tests/test_dit_analyzer.py new file mode 100644 index 0000000..8bbaf77 --- /dev/null +++ b/tests/test_dit_analyzer.py @@ -0,0 +1,153 @@ +""" +DITAnalyzer Test Script +""" + +import sys +import os +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import pandas as pd +from datetime import datetime, timedelta +from services.dit_analyzer import DITAnalyzer, DITAnalyzerError + + +def create_sample_data(): + """Create sample test data""" + days_ago_90 = datetime.now() - timedelta(days=90) + days_ago_30 = datetime.now() - timedelta(days=30) + + data = { + 'Created Date': ['2024-01-01', '2024-02-01', '2024-03-01', '2024-04-01', '2024-05-01', + '2024-01-15', '2024-02-15', '2024-03-15', '2024-04-15', '2024-05-15'], + 'Account Name': ['CustomerA', 'CustomerB', 'CustomerA', 'CustomerC', 'CustomerD', + 'CustomerE', 'CustomerF', 'CustomerA', 'CustomerG', 'CustomerH'], + 'Stage': ['Won', 'Opportunity Lost', 'Negotiation', 'Won', 'Design-Lost', + 'Negotiation', 'Mass Production', 'Opportunity Lost', 'Negotiation', 'Won'], + 'Application': ['Automotive', '', 'Automotive', 'IoT', '', + 'Automotive', 'Consumer', '', 'Industrial', 'Automotive'], + 'Application Detail': ['', 'Consumer Electronics', '', '', 'Smart Home', + '', '', 'Power Supply', '', ''], + 'Opportunity Name': ['Project Alpha', 'Project Beta', 'Project Gamma', 'Project Delta', 'Project Epsilon', + 'Project Zeta', 'Project Eta', 'Project Theta', 'Project Iota', 'Project Kappa'], + 'Total Price': [500000, 300000, 800000, 150000, 200000, + 1200000, 50000, 100000, 450000, 600000], + 'Approved date': [None, None, days_ago_90.strftime('%Y-%m-%d'), None, None, + days_ago_90.strftime('%Y-%m-%d'), None, None, days_ago_30.strftime('%Y-%m-%d'), None], + 'Lost Type': ['', 'Price', '', '', 'Spec', + '', '', 'Price', '', ''] + } + + return pd.DataFrame(data) + + +def test_preprocess(): + """Test data preprocessing""" + print("=" * 50) + print("Test 1: Data Preprocessing") + print("=" * 50) + + df = create_sample_data() + analyzer = DITAnalyzer(dataframe=df) + + processed_df = analyzer.get_dataframe() + + print(f"Total records: {len(processed_df)}") + print(f"Columns: {list(processed_df.columns)}") + + assert 'Derived_Application' in processed_df.columns + assert 'Is_Lost' in processed_df.columns + assert 'Is_Active' in processed_df.columns + print("\n[PASS] Preprocess test passed!") + + +def test_resource_allocation(): + """Test Feature 6.2: High Value Resource Allocation""" + print("\n" + "=" * 50) + print("Test 2: Feature 6.2 Resource Allocation") + print("=" * 50) + + df = create_sample_data() + analyzer = DITAnalyzer(dataframe=df) + + results = analyzer.analyze_resource_allocation(top_percent=0.5, low_win_rate=0.5) + + print(f"Found {len(results)} high-value low-win-rate applications") + for card in results: + print(f"\n[CARD] {card['title']}") + print(f" Application: {card['application']}") + print(f" Potential Value: {card['money']}") + print(f" Win Rate: {card['win_rate']}%") + + print("\n[PASS] Resource allocation test passed!") + + +def test_stagnant_deals(): + """Test Feature 6.3: Stagnant Deal Alert""" + print("\n" + "=" * 50) + print("Test 3: Feature 6.3 Stagnant Deals") + print("=" * 50) + + df = create_sample_data() + analyzer = DITAnalyzer(dataframe=df) + + results = analyzer.analyze_stagnant_deals(threshold_days=60) + + print(f"Found {len(results)} stagnant deals") + for card in results: + print(f"\n[ALERT] {card['title']}") + print(f" Account: {card['account']}") + print(f" Project: {card['project']}") + print(f" Days Pending: {card['days_pending']}") + + print("\n[PASS] Stagnant deals test passed!") + + +def test_full_report(): + """Test full report generation""" + print("\n" + "=" * 50) + print("Test 4: Full Report Generation") + print("=" * 50) + + df = create_sample_data() + analyzer = DITAnalyzer(dataframe=df) + + report = analyzer.generate_report(top_percent=0.5, low_win_rate=0.5, threshold_days=60) + + print(f"\n[REPORT] Generated at: {report['generated_at']}") + summary = report['summary'] + print(f" Total Records: {summary['total_records']}") + print(f" Total Value: {summary['total_value']}") + print(f" Win Rate: {summary['win_rate']}") + + print(f"\n[ACTION CARDS] Total: {report['total_alerts']}") + print(f" - Resource Allocation: {len(report['action_cards']['resource_allocation'])}") + print(f" - Stagnant Deals: {len(report['action_cards']['stagnant_deals'])}") + + print("\n[PASS] Full report test passed!") + + +def main(): + """Run all tests""" + print("\n[START] DITAnalyzer Test Suite\n") + + try: + test_preprocess() + test_resource_allocation() + test_stagnant_deals() + test_full_report() + + print("\n" + "=" * 50) + print("[SUCCESS] All tests passed!") + print("=" * 50) + + except Exception as e: + print(f"\n[FAIL] Test failed: {e}") + import traceback + traceback.print_exc() + return 1 + + return 0 + + +if __name__ == '__main__': + exit(main())