217 lines
7.5 KiB
Python
217 lines
7.5 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""End-to-end tests for realtime equipment status cache.
|
|
|
|
Tests the full flow from cache sync to API response.
|
|
Requires a running server with --run-e2e flag.
|
|
"""
|
|
|
|
import pytest
|
|
import requests
|
|
|
|
|
|
@pytest.mark.e2e
|
|
class TestEquipmentStatusCacheSync:
|
|
"""Test equipment status cache synchronization."""
|
|
|
|
def test_health_check_includes_equipment_status_cache(self, health_url):
|
|
"""Test health check includes equipment_status_cache status."""
|
|
response = requests.get(health_url)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
# Should have equipment_status_cache in response
|
|
assert 'equipment_status_cache' in data
|
|
cache_status = data['equipment_status_cache']
|
|
|
|
# Should have expected fields
|
|
assert 'enabled' in cache_status
|
|
assert 'loaded' in cache_status
|
|
assert 'count' in cache_status
|
|
assert 'updated_at' in cache_status
|
|
|
|
def test_health_check_includes_workcenter_mapping(self, health_url):
|
|
"""Test health check includes workcenter_mapping status."""
|
|
response = requests.get(health_url)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
# Should have workcenter_mapping in response
|
|
assert 'workcenter_mapping' in data
|
|
wc_status = data['workcenter_mapping']
|
|
|
|
# Should have expected fields
|
|
assert 'loaded' in wc_status
|
|
assert 'workcenter_count' in wc_status
|
|
assert 'group_count' in wc_status
|
|
|
|
|
|
@pytest.mark.e2e
|
|
class TestMergedQueryApi:
|
|
"""Test merged resource status API endpoints."""
|
|
|
|
def test_resource_status_endpoint(self, api_base_url):
|
|
"""Test /api/resource/status endpoint."""
|
|
url = f"{api_base_url}/resource/status"
|
|
response = requests.get(url)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
assert data['success'] is True
|
|
assert 'data' in data
|
|
assert 'count' in data
|
|
|
|
# If data exists, verify structure
|
|
if data['data']:
|
|
record = data['data'][0]
|
|
# Should have merged fields
|
|
assert 'RESOURCEID' in record
|
|
assert 'RESOURCENAME' in record
|
|
# Should have workcenter mapping fields
|
|
assert 'WORKCENTER_GROUP' in record
|
|
assert 'WORKCENTER_SHORT' in record
|
|
# Should have realtime status fields
|
|
assert 'STATUS_CATEGORY' in record
|
|
|
|
def test_resource_status_with_workcenter_filter(self, api_base_url):
|
|
"""Test /api/resource/status with workcenter_groups filter."""
|
|
url = f"{api_base_url}/resource/status"
|
|
response = requests.get(url, params={'workcenter_groups': '焊接'})
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
assert data['success'] is True
|
|
|
|
# All results should be in the specified group
|
|
for record in data['data']:
|
|
# May be None if mapping not found
|
|
if record.get('WORKCENTER_GROUP'):
|
|
assert record['WORKCENTER_GROUP'] == '焊接'
|
|
|
|
def test_resource_status_with_production_filter(self, api_base_url):
|
|
"""Test /api/resource/status with is_production filter."""
|
|
url = f"{api_base_url}/resource/status"
|
|
response = requests.get(url, params={'is_production': 'true'})
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
assert data['success'] is True
|
|
|
|
def test_resource_status_with_status_category_filter(self, api_base_url):
|
|
"""Test /api/resource/status with status_categories filter."""
|
|
url = f"{api_base_url}/resource/status"
|
|
response = requests.get(url, params={'status_categories': 'PRODUCTIVE,DOWN'})
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
assert data['success'] is True
|
|
|
|
# All results should be in specified categories
|
|
for record in data['data']:
|
|
if record.get('STATUS_CATEGORY'):
|
|
assert record['STATUS_CATEGORY'] in ['PRODUCTIVE', 'DOWN']
|
|
|
|
def test_resource_status_summary_endpoint(self, api_base_url):
|
|
"""Test /api/resource/status/summary endpoint."""
|
|
url = f"{api_base_url}/resource/status/summary"
|
|
response = requests.get(url)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
assert data['success'] is True
|
|
assert 'data' in data
|
|
|
|
summary = data['data']
|
|
assert 'total_count' in summary
|
|
assert 'by_status_category' in summary
|
|
assert 'by_workcenter_group' in summary
|
|
assert 'with_active_job' in summary
|
|
assert 'with_wip' in summary
|
|
|
|
def test_resource_status_matrix_endpoint(self, api_base_url):
|
|
"""Test /api/resource/status/matrix endpoint."""
|
|
url = f"{api_base_url}/resource/status/matrix"
|
|
response = requests.get(url)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
assert data['success'] is True
|
|
assert 'data' in data
|
|
|
|
# If data exists, verify structure
|
|
if data['data']:
|
|
row = data['data'][0]
|
|
assert 'workcenter_group' in row
|
|
assert 'workcenter_sequence' in row
|
|
assert 'total' in row
|
|
# Should have standard status columns
|
|
assert 'PRD' in row
|
|
assert 'SBY' in row
|
|
assert 'UDT' in row
|
|
assert 'SDT' in row
|
|
assert 'EGT' in row
|
|
assert 'NST' in row
|
|
assert 'OTHER' in row
|
|
|
|
|
|
@pytest.mark.e2e
|
|
class TestFilterOptionsIncludeNewFields:
|
|
"""Test filter options API includes new fields."""
|
|
|
|
def test_status_options_endpoint(self, api_base_url):
|
|
"""Test /api/resource/status/options endpoint."""
|
|
url = f"{api_base_url}/resource/status/options"
|
|
response = requests.get(url)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
assert data['success'] is True
|
|
assert 'data' in data
|
|
|
|
options = data['data']
|
|
# Should have workcenter_groups
|
|
assert 'workcenter_groups' in options
|
|
assert isinstance(options['workcenter_groups'], list)
|
|
|
|
# Should have status_categories
|
|
assert 'status_categories' in options
|
|
assert isinstance(options['status_categories'], list)
|
|
|
|
|
|
@pytest.mark.e2e
|
|
@pytest.mark.redis
|
|
class TestCacheIntegration:
|
|
"""Test cache integration (requires Redis)."""
|
|
|
|
def test_cache_data_consistency(self, api_base_url, health_url):
|
|
"""Test cache data is consistent between health and API."""
|
|
# Get health status
|
|
health_resp = requests.get(health_url)
|
|
health_data = health_resp.json()
|
|
|
|
cache_status = health_data.get('equipment_status_cache', {})
|
|
|
|
if not cache_status.get('enabled') or not cache_status.get('loaded'):
|
|
pytest.skip("Equipment status cache not enabled or loaded")
|
|
|
|
cache_count = cache_status.get('count', 0)
|
|
|
|
# Get all equipment status via API
|
|
api_resp = requests.get(f"{api_base_url}/resource/status")
|
|
api_data = api_resp.json()
|
|
|
|
# Count should be consistent (within reasonable margin for filtering)
|
|
api_count = api_data.get('count', 0)
|
|
|
|
# API may have filters applied from resource-cache, so it could be less
|
|
# but should never exceed cache count
|
|
assert api_count <= cache_count or cache_count == 0
|