Files
DashBoard/tests/test_field_contracts.py

128 lines
4.3 KiB
Python

# -*- coding: utf-8 -*-
"""Field contract governance tests."""
from __future__ import annotations
import csv
import io
from unittest.mock import patch
import pandas as pd
from mes_dashboard.config.field_contracts import (
get_page_contract,
get_export_api_keys,
get_export_headers,
)
from mes_dashboard.services.job_query_service import export_jobs_with_history
from mes_dashboard.services.resource_history_service import export_csv as export_resource_history_csv
def test_contract_sections_exist_for_primary_pages():
for page, section in [
('job_query', 'jobs_table'),
('job_query', 'txn_table'),
('job_query', 'export'),
('resource_history', 'detail_table'),
('resource_history', 'export'),
('tables', 'result_table'),
('excel_query', 'result_table'),
('resource_status', 'matrix_summary'),
]:
contract = get_page_contract(page, section)
assert contract, f"missing contract for {page}:{section}"
def test_export_contracts_have_no_duplicate_api_keys():
for page in ['job_query', 'resource_history']:
keys = [field.get('api_key') for field in get_page_contract(page, 'export')]
assert len(keys) == len(set(keys))
def test_export_headers_and_keys_have_same_length():
for page in ['job_query', 'resource_history']:
headers = get_export_headers(page)
keys = get_export_api_keys(page)
assert headers
assert keys
assert len(headers) == len(keys)
def test_all_contract_fields_define_semantic_type():
pages_and_sections = [
('job_query', 'jobs_table'),
('job_query', 'txn_table'),
('job_query', 'export'),
('resource_history', 'detail_table'),
('resource_history', 'kpi'),
('resource_history', 'export'),
('tables', 'result_table'),
('excel_query', 'result_table'),
('resource_status', 'matrix_summary'),
]
for page, section in pages_and_sections:
for field in get_page_contract(page, section):
assert field.get('semantic_type'), f"missing semantic_type in {page}:{section}:{field}"
@patch('mes_dashboard.services.job_query_service.SQLLoader.load', return_value='SELECT 1')
def test_job_query_export_uses_contract_headers(_mock_sql):
export_keys = get_export_api_keys('job_query')
export_headers = get_export_headers('job_query')
row = {key: f'v_{idx}' for idx, key in enumerate(export_keys)}
row['JOB_CREATEDATE'] = pd.Timestamp('2024-01-01 10:00:00')
row['JOB_COMPLETEDATE'] = pd.Timestamp('2024-01-02 10:00:00')
row['TXNDATE'] = pd.Timestamp('2024-01-02 11:00:00')
df = pd.DataFrame([row], columns=export_keys)
with patch('mes_dashboard.services.job_query_service.read_sql_df', return_value=df):
chunks = list(export_jobs_with_history(['R1'], '2024-01-01', '2024-01-10'))
assert chunks
header_chunk = chunks[0].lstrip('\ufeff')
header_row = next(csv.reader(io.StringIO(header_chunk)))
assert header_row == export_headers
@patch('mes_dashboard.services.resource_history_service.SQLLoader.load', return_value='SELECT 1')
@patch('mes_dashboard.services.resource_history_service.read_sql_df')
@patch('mes_dashboard.services.filter_cache.get_workcenter_mapping')
def test_resource_history_export_uses_contract_headers(
mock_wc_mapping,
mock_read_sql,
_mock_sql_loader,
):
export_headers = get_export_headers('resource_history')
mock_wc_mapping.return_value = {
'WC-A': {'group': '站點-A', 'sequence': 1}
}
mock_read_sql.return_value = pd.DataFrame([
{
'HISTORYID': 'RES-A',
'PRD_HOURS': 10,
'SBY_HOURS': 2,
'UDT_HOURS': 1,
'SDT_HOURS': 1,
'EGT_HOURS': 1,
'NST_HOURS': 1,
'TOTAL_HOURS': 16,
}
])
with patch('mes_dashboard.services.resource_history_service._get_filtered_resources', return_value=[
{
'RESOURCEID': 'RES-A',
'WORKCENTERNAME': 'WC-A',
'RESOURCEFAMILYNAME': 'FAM-A',
'RESOURCENAME': 'EQ-A',
}
]):
chunks = list(export_resource_history_csv('2024-01-01', '2024-01-10'))
assert chunks
header_row = next(csv.reader(io.StringIO(chunks[0].lstrip('\ufeff'))))
assert header_row == export_headers