Files
DashBoard/tests/test_resource_history_service.py
2026-02-08 08:30:48 +08:00

447 lines
16 KiB
Python

# -*- coding: utf-8 -*-
"""Unit tests for resource_history_service.py.
Tests the service layer functions for resource history analysis.
"""
import unittest
from unittest.mock import patch, MagicMock
from datetime import datetime, timedelta
import pandas as pd
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src'))
from mes_dashboard.services.resource_history_service import (
get_filter_options,
query_summary,
query_detail,
export_csv,
_validate_date_range,
_get_date_trunc,
_calc_ou_pct,
_calc_availability_pct,
_build_kpi_from_df,
_build_detail_from_raw_df,
MAX_QUERY_DAYS,
)
class TestValidateDateRange(unittest.TestCase):
"""Test date range validation."""
def test_valid_date_range(self):
"""Valid date range should return None."""
result = _validate_date_range('2024-01-01', '2024-01-31')
self.assertIsNone(result)
def test_date_range_exceeds_max(self):
"""Date range exceeding MAX_QUERY_DAYS should return error message."""
result = _validate_date_range('2024-01-01', '2026-01-02')
self.assertIsNotNone(result)
self.assertIn('730', result)
def test_end_date_before_start_date(self):
"""End date before start date should return error message."""
result = _validate_date_range('2024-01-31', '2024-01-01')
self.assertIsNotNone(result)
self.assertIn('起始日期', result)
def test_invalid_date_format(self):
"""Invalid date format should return error message."""
result = _validate_date_range('invalid', '2024-01-01')
self.assertIsNotNone(result)
self.assertIn('日期格式錯誤', result)
class TestGetDateTrunc(unittest.TestCase):
"""Test date truncation SQL generation."""
def test_day_granularity(self):
"""Day granularity should use TRUNC without format."""
result = _get_date_trunc('day')
self.assertIn('TRUNC(TXNDATE)', result)
self.assertNotIn('IW', result)
def test_week_granularity(self):
"""Week granularity should use TRUNC with IW format."""
result = _get_date_trunc('week')
self.assertIn("'IW'", result)
def test_month_granularity(self):
"""Month granularity should use TRUNC with MM format."""
result = _get_date_trunc('month')
self.assertIn("'MM'", result)
def test_year_granularity(self):
"""Year granularity should use TRUNC with YYYY format."""
result = _get_date_trunc('year')
self.assertIn("'YYYY'", result)
def test_unknown_granularity(self):
"""Unknown granularity should default to day."""
result = _get_date_trunc('unknown')
self.assertIn('TRUNC(TXNDATE)', result)
self.assertNotIn("'IW'", result)
class TestCalcOuPct(unittest.TestCase):
"""Test OU% calculation."""
def test_normal_calculation(self):
"""OU% should be calculated correctly."""
# PRD=800, SBY=100, UDT=50, SDT=30, EGT=20
# OU% = 800 / (800+100+50+30+20) * 100 = 80%
result = _calc_ou_pct(800, 100, 50, 30, 20)
self.assertEqual(result, 80.0)
def test_zero_denominator(self):
"""Zero denominator should return 0, not error."""
result = _calc_ou_pct(0, 0, 0, 0, 0)
self.assertEqual(result, 0)
def test_all_prd(self):
"""100% PRD should result in 100% OU."""
result = _calc_ou_pct(100, 0, 0, 0, 0)
self.assertEqual(result, 100.0)
def test_no_prd(self):
"""No PRD should result in 0% OU."""
result = _calc_ou_pct(0, 100, 50, 30, 20)
self.assertEqual(result, 0)
class TestCalcAvailabilityPct(unittest.TestCase):
"""Test Availability% calculation."""
def test_normal_calculation(self):
"""Availability% should be calculated correctly."""
# PRD=800, SBY=100, UDT=50, SDT=30, EGT=20, NST=100
# Availability% = (800+100+20) / (800+100+20+30+50+100) * 100 = 920 / 1100 * 100 = 83.6%
result = _calc_availability_pct(800, 100, 50, 30, 20, 100)
self.assertEqual(result, 83.6)
def test_zero_denominator(self):
"""Zero denominator should return 0, not error."""
result = _calc_availability_pct(0, 0, 0, 0, 0, 0)
self.assertEqual(result, 0)
def test_all_available(self):
"""100% available (no SDT, UDT, NST) should result in 100%."""
# PRD=100, SBY=50, EGT=50, no SDT/UDT/NST
# Availability% = (100+50+50) / (100+50+50+0+0+0) * 100 = 100%
result = _calc_availability_pct(100, 50, 0, 0, 50, 0)
self.assertEqual(result, 100.0)
def test_no_available_time(self):
"""No available time (all SDT/UDT/NST) should result in 0%."""
# PRD=0, SBY=0, EGT=0, SDT=50, UDT=30, NST=20
# Availability% = 0 / (0+0+0+50+30+20) * 100 = 0%
result = _calc_availability_pct(0, 0, 50, 30, 0, 20)
self.assertEqual(result, 0)
def test_mixed_scenario(self):
"""Mixed scenario with partial availability."""
# PRD=500, SBY=200, UDT=100, SDT=100, EGT=50, NST=50
# Numerator = PRD + SBY + EGT = 500 + 200 + 50 = 750
# Denominator = 500 + 200 + 50 + 100 + 100 + 50 = 1000
# Availability% = 750 / 1000 * 100 = 75%
result = _calc_availability_pct(500, 200, 100, 100, 50, 50)
self.assertEqual(result, 75.0)
class TestBuildKpiFromDf(unittest.TestCase):
"""Test KPI building from DataFrame."""
def test_empty_dataframe(self):
"""Empty DataFrame should return default KPI values."""
df = pd.DataFrame()
result = _build_kpi_from_df(df)
self.assertEqual(result['ou_pct'], 0)
self.assertEqual(result['availability_pct'], 0)
self.assertEqual(result['prd_hours'], 0)
self.assertEqual(result['machine_count'], 0)
def test_normal_dataframe(self):
"""Normal DataFrame should build correct KPI."""
df = pd.DataFrame([{
'PRD_HOURS': 800,
'SBY_HOURS': 100,
'UDT_HOURS': 50,
'SDT_HOURS': 30,
'EGT_HOURS': 20,
'NST_HOURS': 100,
'MACHINE_COUNT': 10
}])
result = _build_kpi_from_df(df)
self.assertEqual(result['ou_pct'], 80.0)
# Availability% = (800+100+20) / (800+100+20+30+50+100) * 100 = 920/1100 = 83.6%
self.assertEqual(result['availability_pct'], 83.6)
self.assertEqual(result['prd_hours'], 800)
self.assertEqual(result['machine_count'], 10)
def test_none_values_in_dataframe(self):
"""None values should be treated as 0."""
df = pd.DataFrame([{
'PRD_HOURS': None,
'SBY_HOURS': None,
'UDT_HOURS': None,
'SDT_HOURS': None,
'EGT_HOURS': None,
'NST_HOURS': None,
'MACHINE_COUNT': None
}])
result = _build_kpi_from_df(df)
self.assertEqual(result['ou_pct'], 0)
self.assertEqual(result['availability_pct'], 0)
self.assertEqual(result['prd_hours'], 0)
self.assertEqual(result['machine_count'], 0)
class TestBuildDetailFromDf(unittest.TestCase):
"""Test detail data building from DataFrame."""
def test_empty_dataframe(self):
"""Empty DataFrame should return empty list."""
df = pd.DataFrame()
resource_lookup = {}
result = _build_detail_from_raw_df(df, resource_lookup)
self.assertEqual(result, [])
@patch('mes_dashboard.services.filter_cache.get_workcenter_mapping')
def test_normal_dataframe(self, mock_wc_mapping):
"""Normal DataFrame should build correct detail data."""
mock_wc_mapping.return_value = {
'WC01': {'group': 'Group01', 'sequence': 1}
}
df = pd.DataFrame([{
'HISTORYID': 'RES01',
'PRD_HOURS': 80,
'SBY_HOURS': 10,
'UDT_HOURS': 5,
'SDT_HOURS': 3,
'EGT_HOURS': 2,
'NST_HOURS': 10,
'TOTAL_HOURS': 110
}])
resource_lookup = {
'RES01': {
'RESOURCEID': 'RES01',
'WORKCENTERNAME': 'WC01',
'RESOURCEFAMILYNAME': 'FAM01',
'RESOURCENAME': 'RES01'
}
}
result = _build_detail_from_raw_df(df, resource_lookup)
self.assertEqual(len(result), 1)
self.assertEqual(result[0]['workcenter'], 'Group01')
self.assertEqual(result[0]['family'], 'FAM01')
self.assertEqual(result[0]['resource'], 'RES01')
self.assertEqual(result[0]['machine_count'], 1)
# OU% = 80 / (80+10+5+3+2) * 100 = 80%
self.assertEqual(result[0]['ou_pct'], 80.0)
class TestGetFilterOptions(unittest.TestCase):
"""Test filter options retrieval."""
@patch('mes_dashboard.services.filter_cache.get_workcenter_groups')
@patch('mes_dashboard.services.resource_cache.get_resource_families')
def test_cache_failure(self, mock_families, mock_groups):
"""Cache failure should return None."""
mock_groups.return_value = None
mock_families.return_value = None
result = get_filter_options()
self.assertIsNone(result)
@patch('mes_dashboard.services.filter_cache.get_workcenter_groups')
@patch('mes_dashboard.services.resource_cache.get_resource_families')
def test_successful_query(self, mock_families, mock_groups):
"""Successful query should return workcenter groups and families."""
mock_groups.return_value = [
{'name': '焊接_DB', 'sequence': 1},
{'name': '成型', 'sequence': 4},
]
mock_families.return_value = ['FAM01', 'FAM02']
result = get_filter_options()
self.assertIsNotNone(result)
self.assertEqual(len(result['workcenter_groups']), 2)
self.assertEqual(result['workcenter_groups'][0]['name'], '焊接_DB')
self.assertEqual(result['families'], ['FAM01', 'FAM02'])
class TestQuerySummary(unittest.TestCase):
"""Test summary query function."""
def test_invalid_date_range(self):
"""Invalid date range should return error."""
result = query_summary(
start_date='2024-01-01',
end_date='2026-01-02', # More than 730 days
granularity='day'
)
self.assertIsNotNone(result)
self.assertIn('error', result)
@patch('mes_dashboard.services.resource_history_service.read_sql_df')
def test_successful_query(self, mock_read_sql):
"""Successful query should return all sections."""
# Mock data for all queries
kpi_df = pd.DataFrame([{
'PRD_HOURS': 800, 'SBY_HOURS': 100, 'UDT_HOURS': 50,
'SDT_HOURS': 30, 'EGT_HOURS': 20, 'NST_HOURS': 100,
'MACHINE_COUNT': 10
}])
trend_df = pd.DataFrame([{
'DATA_DATE': datetime(2024, 1, 1),
'PRD_HOURS': 100, 'SBY_HOURS': 10, 'UDT_HOURS': 5,
'SDT_HOURS': 3, 'EGT_HOURS': 2, 'NST_HOURS': 10,
'MACHINE_COUNT': 5
}])
heatmap_df = pd.DataFrame([{
'WORKCENTERNAME': 'WC01', 'DATA_DATE': datetime(2024, 1, 1),
'PRD_HOURS': 80, 'SBY_HOURS': 10, 'UDT_HOURS': 5,
'SDT_HOURS': 3, 'EGT_HOURS': 2
}])
comparison_df = pd.DataFrame([{
'WORKCENTERNAME': 'WC01',
'PRD_HOURS': 800, 'SBY_HOURS': 100, 'UDT_HOURS': 50,
'SDT_HOURS': 30, 'EGT_HOURS': 20, 'MACHINE_COUNT': 10
}])
# Use a function to return appropriate mock based on SQL content
# (ThreadPoolExecutor runs queries in parallel, so side_effect list is unreliable)
def mock_sql(sql):
sql_upper = sql.upper()
if 'DATA_DATE' in sql_upper and 'WORKCENTERNAME' in sql_upper:
return heatmap_df # heatmap has both DATA_DATE and WORKCENTERNAME
elif 'DATA_DATE' in sql_upper:
return trend_df # trend has DATA_DATE but no WORKCENTERNAME
elif 'WORKCENTERNAME' in sql_upper:
return comparison_df # comparison has WORKCENTERNAME but no DATA_DATE
else:
return kpi_df # kpi has neither
mock_read_sql.side_effect = mock_sql
result = query_summary(
start_date='2024-01-01',
end_date='2024-01-07',
granularity='day'
)
self.assertIsNotNone(result)
self.assertIn('kpi', result)
self.assertIn('trend', result)
self.assertIn('heatmap', result)
self.assertIn('workcenter_comparison', result)
class TestQueryDetail(unittest.TestCase):
"""Test detail query function."""
def test_invalid_date_range(self):
"""Invalid date range should return error."""
result = query_detail(
start_date='2024-01-01',
end_date='2026-01-02', # More than 730 days
granularity='day'
)
self.assertIsNotNone(result)
self.assertIn('error', result)
@patch('mes_dashboard.services.filter_cache.get_workcenter_mapping')
@patch('mes_dashboard.services.resource_history_service._get_filtered_resources')
@patch('mes_dashboard.services.resource_history_service.read_sql_df')
def test_successful_query(self, mock_read_sql, mock_get_resources, mock_wc_mapping):
"""Successful query should return data with total count."""
# Mock filtered resources
mock_get_resources.return_value = [
{'RESOURCEID': 'RES01', 'WORKCENTERNAME': 'WC01',
'RESOURCEFAMILYNAME': 'FAM01', 'RESOURCENAME': 'RES01'}
]
mock_wc_mapping.return_value = {
'WC01': {'group': 'Group01', 'sequence': 1}
}
# Mock detail query with HISTORYID column
detail_df = pd.DataFrame([{
'HISTORYID': 'RES01',
'PRD_HOURS': 80, 'SBY_HOURS': 10, 'UDT_HOURS': 5,
'SDT_HOURS': 3, 'EGT_HOURS': 2, 'NST_HOURS': 10,
'TOTAL_HOURS': 110
}])
mock_read_sql.return_value = detail_df
result = query_detail(
start_date='2024-01-01',
end_date='2024-01-07',
granularity='day',
)
self.assertIsNotNone(result)
self.assertIn('data', result)
self.assertIn('total', result)
self.assertIn('truncated', result)
self.assertEqual(result['total'], 1)
self.assertFalse(result['truncated'])
class TestExportCsv(unittest.TestCase):
"""Test CSV export function."""
def test_invalid_date_range(self):
"""Invalid date range should yield error."""
result = list(export_csv(
start_date='2024-01-01',
end_date='2026-01-02', # More than 730 days
))
self.assertTrue(any('Error' in r for r in result))
@patch('mes_dashboard.services.filter_cache.get_workcenter_mapping')
@patch('mes_dashboard.services.resource_history_service._get_filtered_resources')
@patch('mes_dashboard.services.resource_history_service.read_sql_df')
def test_successful_export(self, mock_read_sql, mock_get_filtered_resources, mock_wc_mapping):
"""Successful export should yield CSV rows."""
mock_get_filtered_resources.return_value = [{
'RESOURCEID': 'RES01',
'WORKCENTERNAME': 'WC01',
'RESOURCEFAMILYNAME': 'FAM01',
'RESOURCENAME': 'RES01',
}]
mock_wc_mapping.return_value = {'WC01': {'group': 'WC01', 'sequence': 1}}
mock_read_sql.return_value = pd.DataFrame([{
'HISTORYID': 'RES01',
'PRD_HOURS': 80, 'SBY_HOURS': 10, 'UDT_HOURS': 5,
'SDT_HOURS': 3, 'EGT_HOURS': 2, 'NST_HOURS': 10,
'TOTAL_HOURS': 110
}])
result = list(export_csv(
start_date='2024-01-01',
end_date='2024-01-07',
))
# Should have header row + data row
self.assertGreaterEqual(len(result), 2)
# Header should contain column names
self.assertIn('站點', result[0])
self.assertIn('OU%', result[0])
if __name__ == '__main__':
unittest.main()