This commit is contained in:
2025-12-12 16:03:19 +08:00
14 changed files with 933 additions and 0 deletions

19
.env.example Normal file
View File

@@ -0,0 +1,19 @@
# Flask
FLASK_SECRET_KEY=your-secret-key-change-in-production
FLASK_DEBUG=True
# Database (MySQL)
DB_HOST=your-db-host
DB_PORT=3306
DB_NAME=your-db-name
DB_USER=your-db-user
DB_PASSWORD=your-db-password
# Ollama API
OLLAMA_API_URL=https://your-ollama-api-url
OLLAMA_DEFAULT_MODEL=qwen2.5:3b
# Gitea
GITEA_URL=https://your-gitea-url
GITEA_USER=your-username
GITEA_TOKEN=your-token

34
README.md Normal file
View File

@@ -0,0 +1,34 @@
# DIT_C
## 快速開始
```bash
# 安裝依賴
pip install -r requirements.txt
# 設定環境變數
cp .env.example .env
# 編輯 .env 填入正確的資料庫與 API 資訊
# 啟動服務
python app.py
```
## 測試端點
- 健康檢查: `GET /health`
- LLM 連線測試: `GET /test-llm`
## 專案結構
```
DIT_C/
├── app.py # 主程式入口
├── config.py # 設定檔
├── models/ # 資料庫模型
├── routes/ # 路由模組
├── services/ # 商業邏輯 (含 LLM 服務)
├── utils/ # 工具函式
├── templates/ # HTML 模板
└── static/ # 靜態資源
```

46
app.py Normal file
View File

@@ -0,0 +1,46 @@
"""
DIT_C 主程式入口
"""
from flask import Flask, jsonify
from config import Config
from models import db
def create_app(config_class=Config):
"""應用程式工廠"""
app = Flask(__name__)
app.config.from_object(config_class)
# 初始化擴展
db.init_app(app)
# 註冊 Blueprint
from routes.api import api_bp
app.register_blueprint(api_bp)
# 健康檢查端點
@app.route('/health')
def health_check():
return jsonify({"status": "ok", "message": "DIT_C is running"})
# 測試 LLM 連線
@app.route('/test-llm')
def test_llm():
from services.llm_service import llm_service, LLMServiceError
try:
models = llm_service.get_available_models()
return jsonify({
"status": "ok",
"available_models": models,
"default_model": Config.OLLAMA_DEFAULT_MODEL
})
except LLMServiceError as e:
return jsonify({"status": "error", "message": str(e)}), 500
return app
if __name__ == '__main__':
app = create_app()
app.run(host='0.0.0.0', port=5000, debug=Config.DEBUG)

37
config.py Normal file
View File

@@ -0,0 +1,37 @@
import os
from dotenv import load_dotenv
load_dotenv()
class Config:
"""應用程式設定"""
# Flask
SECRET_KEY = os.getenv('FLASK_SECRET_KEY', 'dev-secret-key')
DEBUG = os.getenv('FLASK_DEBUG', 'False').lower() == 'true'
# Database (MySQL)
DB_HOST = os.getenv('DB_HOST', 'localhost')
DB_PORT = int(os.getenv('DB_PORT', 3306))
DB_NAME = os.getenv('DB_NAME', 'database')
DB_USER = os.getenv('DB_USER', 'root')
DB_PASSWORD = os.getenv('DB_PASSWORD', '')
# SQLAlchemy 連線字串
SQLALCHEMY_DATABASE_URI = (
f"mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
)
SQLALCHEMY_TRACK_MODIFICATIONS = False
# Table 前綴
TABLE_PREFIX = 'DIT_C_'
# Ollama API
OLLAMA_API_URL = os.getenv('OLLAMA_API_URL', 'https://ollama_pjapi.theaken.com')
OLLAMA_DEFAULT_MODEL = os.getenv('OLLAMA_DEFAULT_MODEL', 'qwen2.5:3b')
# Gitea
GITEA_URL = os.getenv('GITEA_URL', '')
GITEA_USER = os.getenv('GITEA_USER', '')
GITEA_TOKEN = os.getenv('GITEA_TOKEN', '')

3
models/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()

10
requirements.txt Normal file
View File

@@ -0,0 +1,10 @@
flask>=3.0.0
flask-sqlalchemy>=3.1.0
flask-login>=0.6.3
python-dotenv>=1.0.0
pymysql>=1.1.0
cryptography>=41.0.0
requests>=2.31.0
pandas>=2.0.0
numpy>=1.24.0
werkzeug>=3.0.0

1
routes/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Routes module

155
routes/api.py Normal file
View File

@@ -0,0 +1,155 @@
"""
DIT 分析 API 路由
"""
import os
import json
from flask import Blueprint, request, jsonify, current_app
from werkzeug.utils import secure_filename
from services.dit_analyzer import DITAnalyzer, DITAnalyzerError
api_bp = Blueprint('api', __name__, url_prefix='/api')
ALLOWED_EXTENSIONS = {'csv'}
UPLOAD_FOLDER = 'uploads'
def allowed_file(filename: str) -> bool:
"""檢查檔案副檔名"""
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
@api_bp.route('/analyze', methods=['POST'])
def analyze_dit():
"""
分析 DIT CSV 檔案
接受 multipart/form-data 上傳 CSV
回傳 JSON 格式分析結果
"""
# 檢查檔案
if 'file' not in request.files:
return jsonify({"error": "未上傳檔案", "code": "NO_FILE"}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"error": "未選擇檔案", "code": "NO_FILENAME"}), 400
if not allowed_file(file.filename):
return jsonify({"error": "僅支援 CSV 檔案", "code": "INVALID_TYPE"}), 400
# 取得參數
top_percent = float(request.form.get('top_percent', 0.2))
low_win_rate = float(request.form.get('low_win_rate', 0.1))
threshold_days = int(request.form.get('threshold_days', 60))
try:
# 確保上傳目錄存在
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
# 儲存檔案
filename = secure_filename(file.filename)
filepath = os.path.join(UPLOAD_FOLDER, filename)
file.save(filepath)
# 執行分析
analyzer = DITAnalyzer(filepath)
report = analyzer.generate_report(
top_percent=top_percent,
low_win_rate=low_win_rate,
threshold_days=threshold_days
)
# 清理暫存檔
os.remove(filepath)
return jsonify({
"status": "success",
"data": report
})
except DITAnalyzerError as e:
return jsonify({"error": str(e), "code": "ANALYZER_ERROR"}), 400
except Exception as e:
return jsonify({"error": f"分析失敗: {str(e)}", "code": "INTERNAL_ERROR"}), 500
@api_bp.route('/analyze/resource-allocation', methods=['POST'])
def analyze_resource_allocation():
"""
僅執行 Feature 6.2: 高價值資源分配分析
"""
if 'file' not in request.files:
return jsonify({"error": "未上傳檔案", "code": "NO_FILE"}), 400
file = request.files['file']
if not allowed_file(file.filename):
return jsonify({"error": "僅支援 CSV 檔案", "code": "INVALID_TYPE"}), 400
top_percent = float(request.form.get('top_percent', 0.2))
low_win_rate = float(request.form.get('low_win_rate', 0.1))
try:
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
filename = secure_filename(file.filename)
filepath = os.path.join(UPLOAD_FOLDER, filename)
file.save(filepath)
analyzer = DITAnalyzer(filepath)
results = analyzer.analyze_resource_allocation(top_percent, low_win_rate)
os.remove(filepath)
return jsonify({
"status": "success",
"data": {
"type": "resource_allocation",
"count": len(results),
"action_cards": results
}
})
except DITAnalyzerError as e:
return jsonify({"error": str(e), "code": "ANALYZER_ERROR"}), 400
except Exception as e:
return jsonify({"error": f"分析失敗: {str(e)}", "code": "INTERNAL_ERROR"}), 500
@api_bp.route('/analyze/stagnant-deals', methods=['POST'])
def analyze_stagnant_deals():
"""
僅執行 Feature 6.3: 呆滯案件警示
"""
if 'file' not in request.files:
return jsonify({"error": "未上傳檔案", "code": "NO_FILE"}), 400
file = request.files['file']
if not allowed_file(file.filename):
return jsonify({"error": "僅支援 CSV 檔案", "code": "INVALID_TYPE"}), 400
threshold_days = int(request.form.get('threshold_days', 60))
try:
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
filename = secure_filename(file.filename)
filepath = os.path.join(UPLOAD_FOLDER, filename)
file.save(filepath)
analyzer = DITAnalyzer(filepath)
results = analyzer.analyze_stagnant_deals(threshold_days)
os.remove(filepath)
return jsonify({
"status": "success",
"data": {
"type": "stagnant_deals",
"count": len(results),
"action_cards": results
}
})
except DITAnalyzerError as e:
return jsonify({"error": str(e), "code": "ANALYZER_ERROR"}), 400
except Exception as e:
return jsonify({"error": f"分析失敗: {str(e)}", "code": "INTERNAL_ERROR"}), 500

1
services/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Services module

323
services/dit_analyzer.py Normal file
View File

@@ -0,0 +1,323 @@
"""
DIT 智能分析模組
解析 DIT CSV 報表,產出行動建議卡片 (Action Cards)
"""
import pandas as pd
import numpy as np
from datetime import datetime
from typing import List, Dict, Optional, Any
class DITAnalyzer:
"""DIT 報表分析器"""
def __init__(self, file_path: Optional[str] = None, dataframe: Optional[pd.DataFrame] = None):
"""
初始化分析器
Args:
file_path: CSV 檔案路徑
dataframe: 或直接傳入 DataFrame
"""
self.df: Optional[pd.DataFrame] = None
self.processed: bool = False
if file_path:
self.load_data(file_path)
elif dataframe is not None:
self.df = dataframe.copy()
self._preprocess()
def load_data(self, file_path: str) -> 'DITAnalyzer':
"""
載入 CSV 資料
Args:
file_path: CSV 檔案路徑
Returns:
self (支援鏈式呼叫)
"""
try:
self.df = pd.read_csv(file_path, encoding='utf-8')
except UnicodeDecodeError:
self.df = pd.read_csv(file_path, encoding='cp950')
except Exception as e:
raise DITAnalyzerError(f"無法載入檔案: {e}")
self._preprocess()
return self
def _preprocess(self) -> None:
"""執行資料清洗與預處理"""
if self.df is None:
raise DITAnalyzerError("尚未載入資料")
# 1. 欄位清洗:移除欄位名稱前後空白
self.df.columns = self.df.columns.str.strip()
# 2. 日期轉換
date_columns = ['Created Date', 'Approved date', 'Close Date']
for col in date_columns:
if col in self.df.columns:
self.df[col] = pd.to_datetime(self.df[col], errors='coerce')
# 3. 數值轉換Total Price
if 'Total Price' in self.df.columns:
self.df['Total Price'] = pd.to_numeric(
self.df['Total Price'].astype(str).str.replace(',', ''),
errors='coerce'
).fillna(0)
# 4. 應用領域推導 (Derived_Application)
self.df['Derived_Application'] = self._derive_application()
# 5. 狀態標記
if 'Stage' in self.df.columns:
self.df['Is_Lost'] = self.df['Stage'].str.contains(
'Lost', case=False, na=False
)
self.df['Is_Active'] = ~self.df['Is_Lost']
else:
self.df['Is_Lost'] = False
self.df['Is_Active'] = True
self.processed = True
def _derive_application(self) -> pd.Series:
"""
推導應用領域
優先順序: Application → Application Detail → Opportunity Name → "Unknown"
"""
def get_app(row):
# 檢查 Application
if 'Application' in row.index:
val = row.get('Application')
if pd.notna(val) and str(val).strip():
return str(val).strip()
# 檢查 Application Detail
if 'Application Detail' in row.index:
val = row.get('Application Detail')
if pd.notna(val) and str(val).strip():
return str(val).strip()
# 檢查 Opportunity Name
if 'Opportunity Name' in row.index:
val = row.get('Opportunity Name')
if pd.notna(val) and str(val).strip():
return str(val).strip()
return "Unknown"
return self.df.apply(get_app, axis=1)
def analyze_resource_allocation(
self,
top_percent: float = 0.2,
low_win_rate: float = 0.1
) -> List[Dict[str, Any]]:
"""
Feature 6.2: 高價值資源分配建議
找出「金礦區」— 金額大但勝率低的應用領域
Args:
top_percent: 金額排名前 X% (預設 20%)
low_win_rate: 勝率門檻 (預設 10%)
Returns:
Action Cards 列表
"""
if not self.processed:
raise DITAnalyzerError("資料尚未預處理")
# 依 Derived_Application 分組
grouped = self.df.groupby('Derived_Application').agg({
'Total Price': 'sum',
'Is_Active': 'mean',
'Account Name': lambda x: x.value_counts().head(3).index.tolist()
}).reset_index()
grouped.columns = ['Application', 'Sum_Total_Price', 'Win_Rate', 'Top_Accounts']
# 排序並取 Top 20%
grouped = grouped.sort_values('Sum_Total_Price', ascending=False)
top_n = max(1, int(len(grouped) * top_percent))
top_apps = grouped.head(top_n)
# 篩選勝率低於門檻的
low_win_apps = top_apps[top_apps['Win_Rate'] < low_win_rate]
# 產出 Action Cards
action_cards = []
for _, row in low_win_apps.iterrows():
money_formatted = f"${row['Sum_Total_Price']:,.0f}"
win_rate_pct = f"{row['Win_Rate'] * 100:.1f}"
top_accounts = ', '.join(row['Top_Accounts'][:3]) if row['Top_Accounts'] else ''
action_cards.append({
"type": "resource_allocation",
"title": "高潛力市場攻堅提醒",
"application": row['Application'],
"money": money_formatted,
"money_raw": row['Sum_Total_Price'],
"win_rate": win_rate_pct,
"win_rate_raw": row['Win_Rate'],
"top_accounts": row['Top_Accounts'][:3] if row['Top_Accounts'] else [],
"suggestion": (
f"{row['Application']} 領域潛在商機巨大 ({money_formatted})"
f"但目前勝率偏低 ({win_rate_pct}%)。"
f"建議指派資深 FAE 介入該領域的前三大案子 (如 {top_accounts})。"
)
})
return action_cards
def analyze_stagnant_deals(
self,
threshold_days: int = 60,
reference_date: Optional[datetime] = None
) -> List[Dict[str, Any]]:
"""
Feature 6.3: 呆滯案件警示
針對技術已承認但商務卡關的案子進行催單
Args:
threshold_days: 呆滯天數門檻 (預設 60 天)
reference_date: 參考日期 (預設為當前日期)
Returns:
Action Cards 列表
"""
if not self.processed:
raise DITAnalyzerError("資料尚未預處理")
if reference_date is None:
reference_date = datetime.now()
# 檢查必要欄位
if 'Approved date' not in self.df.columns:
return []
# 篩選條件
mask = (
(self.df['Stage'].str.contains('Negotiation', case=False, na=False)) &
(self.df['Approved date'].notna())
)
filtered = self.df[mask].copy()
if filtered.empty:
return []
# 計算呆滯天數
filtered['Days_Since_Approved'] = (
reference_date - filtered['Approved date']
).dt.days
# 篩選超過門檻的
stagnant = filtered[filtered['Days_Since_Approved'] > threshold_days]
# 產出 Action Cards
action_cards = []
for _, row in stagnant.iterrows():
days = int(row['Days_Since_Approved'])
months = days // 30
account = row.get('Account Name', 'Unknown')
project = row.get('Opportunity Name', 'Unknown')
approved_date = row['Approved date'].strftime('%Y-%m-%d') if pd.notna(row['Approved date']) else 'N/A'
action_cards.append({
"type": "stagnant_deal",
"title": "呆滯案件喚醒",
"account": account,
"project": project,
"approved_date": approved_date,
"days_pending": days,
"months_pending": months,
"suggestion": (
f"客戶 {account}{project} 已承認超過 {months} 個月 ({days} 天),仍未轉單。"
f"請業務確認是否為「價格」或「庫存」問題。若無下文,應要求客戶給出 Forecast。"
)
})
# 依天數排序 (最久的在前)
action_cards.sort(key=lambda x: x['days_pending'], reverse=True)
return action_cards
def generate_report(
self,
top_percent: float = 0.2,
low_win_rate: float = 0.1,
threshold_days: int = 60
) -> Dict[str, Any]:
"""
彙整所有分析結果
Args:
top_percent: 高價值分析的金額門檻
low_win_rate: 高價值分析的勝率門檻
threshold_days: 呆滯分析的天數門檻
Returns:
完整分析報告 (Dict)
"""
if not self.processed:
raise DITAnalyzerError("資料尚未預處理")
allocation_suggestions = self.analyze_resource_allocation(top_percent, low_win_rate)
stagnant_alerts = self.analyze_stagnant_deals(threshold_days)
# 統計摘要
summary = self._generate_summary()
return {
"generated_at": datetime.now().isoformat(),
"summary": summary,
"action_cards": {
"resource_allocation": allocation_suggestions,
"stagnant_deals": stagnant_alerts
},
"total_alerts": len(allocation_suggestions) + len(stagnant_alerts)
}
def _generate_summary(self) -> Dict[str, Any]:
"""產生統計摘要"""
total_records = len(self.df)
total_value = self.df['Total Price'].sum()
active_count = self.df['Is_Active'].sum()
lost_count = self.df['Is_Lost'].sum()
# 各階段統計
stage_stats = {}
if 'Stage' in self.df.columns:
stage_stats = self.df['Stage'].value_counts().to_dict()
# 應用領域 Top 5
app_stats = self.df.groupby('Derived_Application')['Total Price'].sum()
top_apps = app_stats.nlargest(5).to_dict()
return {
"total_records": total_records,
"total_value": f"${total_value:,.0f}",
"total_value_raw": total_value,
"active_count": int(active_count),
"lost_count": int(lost_count),
"win_rate": f"{(active_count / total_records * 100):.1f}%" if total_records > 0 else "0%",
"stage_distribution": stage_stats,
"top_applications": {k: f"${v:,.0f}" for k, v in top_apps.items()}
}
def get_dataframe(self) -> pd.DataFrame:
"""取得處理後的 DataFrame"""
if self.df is None:
raise DITAnalyzerError("尚未載入資料")
return self.df.copy()
class DITAnalyzerError(Exception):
"""DIT 分析器錯誤"""
pass

150
services/llm_service.py Normal file
View File

@@ -0,0 +1,150 @@
"""
Ollama LLM API 服務模組
支援一般請求與串流模式
"""
import requests
import json
from typing import Generator, Optional
from config import Config
class LLMService:
"""Ollama API 服務封裝"""
def __init__(self, api_url: str = None, default_model: str = None):
self.api_url = api_url or Config.OLLAMA_API_URL
self.default_model = default_model or Config.OLLAMA_DEFAULT_MODEL
def get_available_models(self) -> list:
"""取得可用模型列表"""
try:
response = requests.get(f"{self.api_url}/v1/models", timeout=10)
response.raise_for_status()
models = response.json()
return [m['id'] for m in models.get('data', [])]
except requests.RequestException as e:
raise LLMServiceError(f"無法取得模型列表: {e}")
def chat(
self,
messages: list,
model: str = None,
temperature: float = 0.7,
system_prompt: str = None
) -> str:
"""
發送聊天請求 (非串流)
Args:
messages: 訊息列表 [{"role": "user", "content": "..."}]
model: 模型名稱
temperature: 溫度參數 (0-1)
system_prompt: 系統提示詞
Returns:
AI 回應內容
"""
model = model or self.default_model
# 加入系統提示詞
if system_prompt:
messages = [{"role": "system", "content": system_prompt}] + messages
payload = {
"model": model,
"messages": messages,
"temperature": temperature,
"stream": False
}
try:
response = requests.post(
f"{self.api_url}/v1/chat/completions",
json=payload,
timeout=60
)
response.raise_for_status()
result = response.json()
return result['choices'][0]['message']['content']
except requests.RequestException as e:
raise LLMServiceError(f"聊天請求失敗: {e}")
def chat_stream(
self,
messages: list,
model: str = None,
temperature: float = 0.7,
system_prompt: str = None
) -> Generator[str, None, None]:
"""
發送聊天請求 (串流模式)
Args:
messages: 訊息列表
model: 模型名稱
temperature: 溫度參數
system_prompt: 系統提示詞
Yields:
串流回應的每個片段
"""
model = model or self.default_model
if system_prompt:
messages = [{"role": "system", "content": system_prompt}] + messages
payload = {
"model": model,
"messages": messages,
"temperature": temperature,
"stream": True
}
try:
response = requests.post(
f"{self.api_url}/v1/chat/completions",
json=payload,
stream=True,
timeout=120
)
response.raise_for_status()
for line in response.iter_lines():
if line:
if line.startswith(b"data: "):
data_str = line[6:].decode('utf-8')
if data_str.strip() != "[DONE]":
try:
data = json.loads(data_str)
if 'choices' in data:
delta = data['choices'][0].get('delta', {})
if 'content' in delta:
yield delta['content']
except json.JSONDecodeError:
continue
except requests.RequestException as e:
raise LLMServiceError(f"串流請求失敗: {e}")
def simple_query(self, prompt: str, system_prompt: str = None) -> str:
"""
簡單查詢 (單一問題)
Args:
prompt: 使用者問題
system_prompt: 系統提示詞
Returns:
AI 回應
"""
messages = [{"role": "user", "content": prompt}]
return self.chat(messages, system_prompt=system_prompt)
class LLMServiceError(Exception):
"""LLM 服務錯誤"""
pass
# 全域實例
llm_service = LLMService()

0
tests/__init__.py Normal file
View File

153
tests/test_dit_analyzer.py Normal file
View File

@@ -0,0 +1,153 @@
"""
DITAnalyzer Test Script
"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import pandas as pd
from datetime import datetime, timedelta
from services.dit_analyzer import DITAnalyzer, DITAnalyzerError
def create_sample_data():
"""Create sample test data"""
days_ago_90 = datetime.now() - timedelta(days=90)
days_ago_30 = datetime.now() - timedelta(days=30)
data = {
'Created Date': ['2024-01-01', '2024-02-01', '2024-03-01', '2024-04-01', '2024-05-01',
'2024-01-15', '2024-02-15', '2024-03-15', '2024-04-15', '2024-05-15'],
'Account Name': ['CustomerA', 'CustomerB', 'CustomerA', 'CustomerC', 'CustomerD',
'CustomerE', 'CustomerF', 'CustomerA', 'CustomerG', 'CustomerH'],
'Stage': ['Won', 'Opportunity Lost', 'Negotiation', 'Won', 'Design-Lost',
'Negotiation', 'Mass Production', 'Opportunity Lost', 'Negotiation', 'Won'],
'Application': ['Automotive', '', 'Automotive', 'IoT', '',
'Automotive', 'Consumer', '', 'Industrial', 'Automotive'],
'Application Detail': ['', 'Consumer Electronics', '', '', 'Smart Home',
'', '', 'Power Supply', '', ''],
'Opportunity Name': ['Project Alpha', 'Project Beta', 'Project Gamma', 'Project Delta', 'Project Epsilon',
'Project Zeta', 'Project Eta', 'Project Theta', 'Project Iota', 'Project Kappa'],
'Total Price': [500000, 300000, 800000, 150000, 200000,
1200000, 50000, 100000, 450000, 600000],
'Approved date': [None, None, days_ago_90.strftime('%Y-%m-%d'), None, None,
days_ago_90.strftime('%Y-%m-%d'), None, None, days_ago_30.strftime('%Y-%m-%d'), None],
'Lost Type': ['', 'Price', '', '', 'Spec',
'', '', 'Price', '', '']
}
return pd.DataFrame(data)
def test_preprocess():
"""Test data preprocessing"""
print("=" * 50)
print("Test 1: Data Preprocessing")
print("=" * 50)
df = create_sample_data()
analyzer = DITAnalyzer(dataframe=df)
processed_df = analyzer.get_dataframe()
print(f"Total records: {len(processed_df)}")
print(f"Columns: {list(processed_df.columns)}")
assert 'Derived_Application' in processed_df.columns
assert 'Is_Lost' in processed_df.columns
assert 'Is_Active' in processed_df.columns
print("\n[PASS] Preprocess test passed!")
def test_resource_allocation():
"""Test Feature 6.2: High Value Resource Allocation"""
print("\n" + "=" * 50)
print("Test 2: Feature 6.2 Resource Allocation")
print("=" * 50)
df = create_sample_data()
analyzer = DITAnalyzer(dataframe=df)
results = analyzer.analyze_resource_allocation(top_percent=0.5, low_win_rate=0.5)
print(f"Found {len(results)} high-value low-win-rate applications")
for card in results:
print(f"\n[CARD] {card['title']}")
print(f" Application: {card['application']}")
print(f" Potential Value: {card['money']}")
print(f" Win Rate: {card['win_rate']}%")
print("\n[PASS] Resource allocation test passed!")
def test_stagnant_deals():
"""Test Feature 6.3: Stagnant Deal Alert"""
print("\n" + "=" * 50)
print("Test 3: Feature 6.3 Stagnant Deals")
print("=" * 50)
df = create_sample_data()
analyzer = DITAnalyzer(dataframe=df)
results = analyzer.analyze_stagnant_deals(threshold_days=60)
print(f"Found {len(results)} stagnant deals")
for card in results:
print(f"\n[ALERT] {card['title']}")
print(f" Account: {card['account']}")
print(f" Project: {card['project']}")
print(f" Days Pending: {card['days_pending']}")
print("\n[PASS] Stagnant deals test passed!")
def test_full_report():
"""Test full report generation"""
print("\n" + "=" * 50)
print("Test 4: Full Report Generation")
print("=" * 50)
df = create_sample_data()
analyzer = DITAnalyzer(dataframe=df)
report = analyzer.generate_report(top_percent=0.5, low_win_rate=0.5, threshold_days=60)
print(f"\n[REPORT] Generated at: {report['generated_at']}")
summary = report['summary']
print(f" Total Records: {summary['total_records']}")
print(f" Total Value: {summary['total_value']}")
print(f" Win Rate: {summary['win_rate']}")
print(f"\n[ACTION CARDS] Total: {report['total_alerts']}")
print(f" - Resource Allocation: {len(report['action_cards']['resource_allocation'])}")
print(f" - Stagnant Deals: {len(report['action_cards']['stagnant_deals'])}")
print("\n[PASS] Full report test passed!")
def main():
"""Run all tests"""
print("\n[START] DITAnalyzer Test Suite\n")
try:
test_preprocess()
test_resource_allocation()
test_stagnant_deals()
test_full_report()
print("\n" + "=" * 50)
print("[SUCCESS] All tests passed!")
print("=" * 50)
except Exception as e:
print(f"\n[FAIL] Test failed: {e}")
import traceback
traceback.print_exc()
return 1
return 0
if __name__ == '__main__':
exit(main())

1
utils/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Utils module