""" DITAnalyzer Test Script """ import sys import os sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import pandas as pd from datetime import datetime, timedelta from services.dit_analyzer import DITAnalyzer, DITAnalyzerError def create_sample_data(): """Create sample test data""" days_ago_90 = datetime.now() - timedelta(days=90) days_ago_30 = datetime.now() - timedelta(days=30) data = { 'Created Date': ['2024-01-01', '2024-02-01', '2024-03-01', '2024-04-01', '2024-05-01', '2024-01-15', '2024-02-15', '2024-03-15', '2024-04-15', '2024-05-15'], 'Account Name': ['CustomerA', 'CustomerB', 'CustomerA', 'CustomerC', 'CustomerD', 'CustomerE', 'CustomerF', 'CustomerA', 'CustomerG', 'CustomerH'], 'Stage': ['Won', 'Opportunity Lost', 'Negotiation', 'Won', 'Design-Lost', 'Negotiation', 'Mass Production', 'Opportunity Lost', 'Negotiation', 'Won'], 'Application': ['Automotive', '', 'Automotive', 'IoT', '', 'Automotive', 'Consumer', '', 'Industrial', 'Automotive'], 'Application Detail': ['', 'Consumer Electronics', '', '', 'Smart Home', '', '', 'Power Supply', '', ''], 'Opportunity Name': ['Project Alpha', 'Project Beta', 'Project Gamma', 'Project Delta', 'Project Epsilon', 'Project Zeta', 'Project Eta', 'Project Theta', 'Project Iota', 'Project Kappa'], 'Total Price': [500000, 300000, 800000, 150000, 200000, 1200000, 50000, 100000, 450000, 600000], 'Approved date': [None, None, days_ago_90.strftime('%Y-%m-%d'), None, None, days_ago_90.strftime('%Y-%m-%d'), None, None, days_ago_30.strftime('%Y-%m-%d'), None], 'Lost Type': ['', 'Price', '', '', 'Spec', '', '', 'Price', '', ''] } return pd.DataFrame(data) def test_preprocess(): """Test data preprocessing""" print("=" * 50) print("Test 1: Data Preprocessing") print("=" * 50) df = create_sample_data() analyzer = DITAnalyzer(dataframe=df) processed_df = analyzer.get_dataframe() print(f"Total records: {len(processed_df)}") print(f"Columns: {list(processed_df.columns)}") assert 'Derived_Application' in processed_df.columns assert 'Is_Lost' in processed_df.columns assert 'Is_Active' in processed_df.columns print("\n[PASS] Preprocess test passed!") def test_resource_allocation(): """Test Feature 6.2: High Value Resource Allocation""" print("\n" + "=" * 50) print("Test 2: Feature 6.2 Resource Allocation") print("=" * 50) df = create_sample_data() analyzer = DITAnalyzer(dataframe=df) results = analyzer.analyze_resource_allocation(top_percent=0.5, low_win_rate=0.5) print(f"Found {len(results)} high-value low-win-rate applications") for card in results: print(f"\n[CARD] {card['title']}") print(f" Application: {card['application']}") print(f" Potential Value: {card['money']}") print(f" Win Rate: {card['win_rate']}%") print("\n[PASS] Resource allocation test passed!") def test_stagnant_deals(): """Test Feature 6.3: Stagnant Deal Alert""" print("\n" + "=" * 50) print("Test 3: Feature 6.3 Stagnant Deals") print("=" * 50) df = create_sample_data() analyzer = DITAnalyzer(dataframe=df) results = analyzer.analyze_stagnant_deals(threshold_days=60) print(f"Found {len(results)} stagnant deals") for card in results: print(f"\n[ALERT] {card['title']}") print(f" Account: {card['account']}") print(f" Project: {card['project']}") print(f" Days Pending: {card['days_pending']}") print("\n[PASS] Stagnant deals test passed!") def test_full_report(): """Test full report generation""" print("\n" + "=" * 50) print("Test 4: Full Report Generation") print("=" * 50) df = create_sample_data() analyzer = DITAnalyzer(dataframe=df) report = analyzer.generate_report(top_percent=0.5, low_win_rate=0.5, threshold_days=60) print(f"\n[REPORT] Generated at: {report['generated_at']}") summary = report['summary'] print(f" Total Records: {summary['total_records']}") print(f" Total Value: {summary['total_value']}") print(f" Win Rate: {summary['win_rate']}") print(f"\n[ACTION CARDS] Total: {report['total_alerts']}") print(f" - Resource Allocation: {len(report['action_cards']['resource_allocation'])}") print(f" - Stagnant Deals: {len(report['action_cards']['stagnant_deals'])}") print("\n[PASS] Full report test passed!") def main(): """Run all tests""" print("\n[START] DITAnalyzer Test Suite\n") try: test_preprocess() test_resource_allocation() test_stagnant_deals() test_full_report() print("\n" + "=" * 50) print("[SUCCESS] All tests passed!") print("=" * 50) except Exception as e: print(f"\n[FAIL] Test failed: {e}") import traceback traceback.print_exc() return 1 return 0 if __name__ == '__main__': exit(main())