Fix DWH schema prefixes and table fields

This commit is contained in:
beabigegg
2026-01-29 17:49:35 +08:00
parent 1510646a36
commit 23e105e92c
12 changed files with 5516 additions and 5516 deletions

View File

@@ -8,22 +8,22 @@ Row counts updated from data/table_schema_info.json (2026-01-29)
TABLES_CONFIG = {
'即時數據表 (View)': [
{
'name': 'DW_MES_LOT_V',
'display_name': 'WIP 即時批次 (DW_MES_LOT_V)',
'name': 'DWH.DW_MES_LOT_V',
'display_name': 'WIP 即時批次 (DWH.DW_MES_LOT_V)',
'row_count': 9468, # 動態變化,約 9000-12000
'time_field': 'SYS_DATE',
'description': 'MES 即時 WIP View - 每 5 分鐘更新包含完整批次狀態、工站、設備、Hold 原因等 70 欄位'
},
{
'name': 'DW_MES_EQUIPMENTSTATUS_WIP_V',
'display_name': '設備狀態+WIP 視圖 (DW_MES_EQUIPMENTSTATUS_WIP_V)',
'name': 'DWH.DW_MES_EQUIPMENTSTATUS_WIP_V',
'display_name': '設備狀態+WIP 視圖 (DWH.DW_MES_EQUIPMENTSTATUS_WIP_V)',
'row_count': 2631,
'time_field': None,
'description': '設備狀態與 WIP 關聯視圖 - 設備當前狀態、維修工單、資產狀態等 32 欄位'
},
{
'name': 'DW_MES_SPEC_WORKCENTER_V',
'display_name': '規格工站對照 (DW_MES_SPEC_WORKCENTER_V)',
'name': 'DWH.DW_MES_SPEC_WORKCENTER_V',
'display_name': '規格工站對照 (DWH.DW_MES_SPEC_WORKCENTER_V)',
'row_count': 230,
'time_field': None,
'description': '規格與工站對照視圖 - 規格順序、工站群組、工站順序等 9 欄位'
@@ -31,29 +31,29 @@ TABLES_CONFIG = {
],
'現況快照表': [
{
'name': 'DW_MES_WIP',
'display_name': 'WIP (在制品表)',
'name': 'DWH.DW_MES_WIP',
'display_name': 'WIP (DWH.DW_MES_WIP)',
'row_count': 79058085,
'time_field': 'TXNDATE',
'description': '在製品現況表(含歷史累積)- 當前 WIP 狀態/數量'
},
{
'name': 'DW_MES_RESOURCE',
'display_name': 'RESOURCE (資源主檔)',
'name': 'DWH.DW_MES_RESOURCE',
'display_name': 'RESOURCE (DWH.DW_MES_RESOURCE)',
'row_count': 91329,
'time_field': None,
'description': '資源表 - 設備/載具等資源基本資料OBJECTCATEGORY=ASSEMBLY 時RESOURCENAME 為設備編號)'
},
{
'name': 'DW_MES_CONTAINER',
'display_name': 'CONTAINER (容器信息表)',
'name': 'DWH.DW_MES_CONTAINER',
'display_name': 'CONTAINER (DWH.DW_MES_CONTAINER)',
'row_count': 5218406,
'time_field': 'LASTMOVEOUTTIMESTAMP',
'description': '容器/批次主檔 - 目前在製容器狀態、數量與流程資訊'
},
{
'name': 'DW_MES_JOB',
'display_name': 'JOB (設備維修工單)',
'name': 'DWH.DW_MES_JOB',
'display_name': 'JOB (DWH.DW_MES_JOB)',
'row_count': 1248622,
'time_field': 'CREATEDATE',
'description': '設備維修工單表 - 維修工單的當前狀態與流程'
@@ -61,71 +61,71 @@ TABLES_CONFIG = {
],
'歷史累積表': [
{
'name': 'DW_MES_RESOURCESTATUS',
'display_name': 'RESOURCESTATUS (資源狀態歷史)',
'name': 'DWH.DW_MES_RESOURCESTATUS',
'display_name': 'RESOURCESTATUS (DWH.DW_MES_RESOURCESTATUS)',
'row_count': 65742614,
'time_field': 'OLDLASTSTATUSCHANGEDATE',
'description': '設備狀態變更歷史表 - 狀態切換與原因'
},
{
'name': 'DW_MES_RESOURCESTATUS_SHIFT',
'display_name': 'RESOURCESTATUS_SHIFT (資源班次狀態)',
'name': 'DWH.DW_MES_RESOURCESTATUS_SHIFT',
'display_name': 'RESOURCESTATUS_SHIFT (DWH.DW_MES_RESOURCESTATUS_SHIFT)',
'row_count': 74820134,
'time_field': 'DATADATE',
'description': '設備狀態班次彙總表 - 班次級狀態/工時'
},
{
'name': 'DW_MES_LOTWIPHISTORY',
'display_name': 'LOTWIPHISTORY (批次流轉歷史)',
'name': 'DWH.DW_MES_LOTWIPHISTORY',
'display_name': 'LOTWIPHISTORY (DWH.DW_MES_LOTWIPHISTORY)',
'row_count': 53454213,
'time_field': 'TRACKINTIMESTAMP',
'description': '在製流轉歷史表 - 批次進出站與流程軌跡'
},
{
'name': 'DW_MES_LOTWIPDATAHISTORY',
'display_name': 'LOTWIPDATAHISTORY (批次數據歷史)',
'name': 'DWH.DW_MES_LOTWIPDATAHISTORY',
'display_name': 'LOTWIPDATAHISTORY (DWH.DW_MES_LOTWIPDATAHISTORY)',
'row_count': 77960216,
'time_field': 'TXNTIMESTAMP',
'description': '在製數據採集歷史表 - 製程量測/參數紀錄'
},
{
'name': 'DW_MES_HM_LOTMOVEOUT',
'display_name': 'HM_LOTMOVEOUT (批次移出表)',
'name': 'DWH.DW_MES_HM_LOTMOVEOUT',
'display_name': 'HM_LOTMOVEOUT (DWH.DW_MES_HM_LOTMOVEOUT)',
'row_count': 48645692,
'time_field': 'TXNDATE',
'description': '批次出站事件歷史表 - 出站/移出交易'
},
{
'name': 'DW_MES_JOBTXNHISTORY',
'display_name': 'JOBTXNHISTORY (維修工單交易歷史)',
'name': 'DWH.DW_MES_JOBTXNHISTORY',
'display_name': 'JOBTXNHISTORY (DWH.DW_MES_JOBTXNHISTORY)',
'row_count': 9554723,
'time_field': 'TXNDATE',
'description': '維修工單交易歷史表 - 工單狀態變更紀錄'
},
{
'name': 'DW_MES_LOTREJECTHISTORY',
'display_name': 'LOTREJECTHISTORY (批次拒絕歷史)',
'name': 'DWH.DW_MES_LOTREJECTHISTORY',
'display_name': 'LOTREJECTHISTORY (DWH.DW_MES_LOTREJECTHISTORY)',
'row_count': 15786025,
'time_field': 'CREATEDATE',
'time_field': 'TXNDATE',
'description': '批次不良/報廢歷史表 - 不良原因與數量'
},
{
'name': 'DW_MES_LOTMATERIALSHISTORY',
'display_name': 'LOTMATERIALSHISTORY (物料消耗歷史)',
'name': 'DWH.DW_MES_LOTMATERIALSHISTORY',
'display_name': 'LOTMATERIALSHISTORY (DWH.DW_MES_LOTMATERIALSHISTORY)',
'row_count': 17829931,
'time_field': 'CREATEDATE',
'time_field': 'TXNDATE',
'description': '批次物料消耗歷史表 - 用料與批次關聯'
},
{
'name': 'DW_MES_HOLDRELEASEHISTORY',
'display_name': 'HOLDRELEASEHISTORY (Hold/Release歷史)',
'name': 'DWH.DW_MES_HOLDRELEASEHISTORY',
'display_name': 'HOLDRELEASEHISTORY (DWH.DW_MES_HOLDRELEASEHISTORY)',
'row_count': 310737,
'time_field': 'HOLDTXNDATE',
'description': 'Hold/Release 歷史表 - 批次停工與解除紀錄'
},
{
'name': 'DW_MES_MAINTENANCE',
'display_name': 'MAINTENANCE (設備維護歷史)',
'name': 'DWH.DW_MES_MAINTENANCE',
'display_name': 'MAINTENANCE (DWH.DW_MES_MAINTENANCE)',
'row_count': 52060026,
'time_field': 'TXNDATE',
'description': '設備保養/維護紀錄表 - 保養計畫與點檢數據'
@@ -133,15 +133,15 @@ TABLES_CONFIG = {
],
'輔助表': [
{
'name': 'DW_MES_PARTREQUESTORDER',
'display_name': 'PARTREQUESTORDER (物料請求訂單)',
'name': 'DWH.DW_MES_PARTREQUESTORDER',
'display_name': 'PARTREQUESTORDER (DWH.DW_MES_PARTREQUESTORDER)',
'row_count': 61396,
'time_field': None,
'description': '維修用料請求表 - 維修/設備零件請領'
},
{
'name': 'DW_MES_PJ_COMBINEDASSYLOTS',
'display_name': 'PJ_COMBINEDASSYLOTS (組合裝配批次)',
'name': 'DWH.DW_MES_PJ_COMBINEDASSYLOTS',
'display_name': 'PJ_COMBINEDASSYLOTS (DWH.DW_MES_PJ_COMBINEDASSYLOTS)',
'row_count': 1965425,
'time_field': None,
'description': '併批紀錄表 - 合批/合併批次關聯與數量資訊'

View File

@@ -84,7 +84,7 @@ def get_cached_wip_data() -> Optional[pd.DataFrame]:
"""Get cached WIP data from Redis.
Returns:
DataFrame with full DW_MES_LOT_V data, or None if cache miss.
DataFrame with full DWH.DW_MES_LOT_V data, or None if cache miss.
"""
if not REDIS_ENABLED:
return None

View File

@@ -1,308 +1,308 @@
# -*- coding: utf-8 -*-
"""Background task for updating WIP and Resource cache from Oracle to Redis."""
from __future__ import annotations
import json
import logging
import os
import threading
import time
from datetime import datetime
from typing import Optional
import pandas as pd
from mes_dashboard.core.redis_client import (
get_redis_client,
get_key,
redis_available,
REDIS_ENABLED
)
from mes_dashboard.core.database import read_sql_df
logger = logging.getLogger('mes_dashboard.cache_updater')
# ============================================================
# Configuration
# ============================================================
CACHE_CHECK_INTERVAL = int(os.getenv('CACHE_CHECK_INTERVAL', '600')) # 10 minutes
WIP_VIEW = "DW_MES_LOT_V"
# Resource cache sync interval (default: 4 hours)
RESOURCE_SYNC_INTERVAL = int(os.getenv('RESOURCE_SYNC_INTERVAL', '14400'))
# ============================================================
# Cache Updater Class
# ============================================================
class CacheUpdater:
"""Background task that periodically checks SYS_DATE and updates cache."""
def __init__(self, interval: int = CACHE_CHECK_INTERVAL):
"""Initialize cache updater.
Args:
interval: Check interval in seconds (default: 600)
"""
self.interval = interval
self.resource_sync_interval = RESOURCE_SYNC_INTERVAL
self._stop_event = threading.Event()
self._thread: Optional[threading.Thread] = None
self._is_running = False
self._last_resource_sync: Optional[float] = None
def start(self) -> None:
"""Start the background update thread."""
if not REDIS_ENABLED:
logger.info("Redis is disabled, cache updater will not start")
return
if self._thread is not None and self._thread.is_alive():
logger.warning("Cache updater is already running")
return
self._stop_event.clear()
self._thread = threading.Thread(
target=self._worker,
daemon=True,
name="cache-updater"
)
self._thread.start()
self._is_running = True
logger.info(f"Cache updater started (interval: {self.interval}s)")
def stop(self) -> None:
"""Stop the background update thread."""
if self._thread is None or not self._thread.is_alive():
return
self._stop_event.set()
self._thread.join(timeout=5)
self._is_running = False
logger.info("Cache updater stopped")
def is_running(self) -> bool:
"""Check if the updater is running."""
return self._is_running and self._thread is not None and self._thread.is_alive()
def force_update(self) -> bool:
"""Force an immediate cache update.
Returns:
True if update was successful.
"""
return self._check_and_update(force=True)
def _worker(self) -> None:
"""Background worker that runs the update loop."""
# Initial update on startup
logger.info("Performing initial cache load...")
self._check_and_update(force=True)
# Initial resource cache load
self._check_resource_update(force=True)
# Periodic updates
while not self._stop_event.wait(self.interval):
try:
self._check_and_update()
self._check_resource_update()
except Exception as e:
logger.error(f"Cache update failed: {e}", exc_info=True)
def _check_and_update(self, force: bool = False) -> bool:
"""Check SYS_DATE and update cache if needed.
Args:
force: If True, update regardless of SYS_DATE.
Returns:
True if cache was updated.
"""
if not redis_available():
logger.warning("Redis not available, skipping cache update")
return False
try:
# Get current SYS_DATE from Oracle
oracle_sys_date = self._check_sys_date()
if oracle_sys_date is None:
logger.error("Failed to get SYS_DATE from Oracle")
return False
# Get cached SYS_DATE from Redis
cached_sys_date = self._get_cached_sys_date()
# Compare and decide whether to update
if not force and cached_sys_date == oracle_sys_date:
logger.debug(f"SYS_DATE unchanged ({oracle_sys_date}), skipping update")
return False
logger.info(f"SYS_DATE changed: {cached_sys_date} -> {oracle_sys_date}, updating cache...")
# Load full table and update Redis
df = self._load_full_table()
if df is None or df.empty:
logger.error("Failed to load data from Oracle")
return False
success = self._update_redis_cache(df, oracle_sys_date)
if success:
logger.info(f"Cache updated successfully ({len(df)} rows)")
return success
except Exception as e:
logger.error(f"Error in cache update: {e}", exc_info=True)
return False
def _check_sys_date(self) -> Optional[str]:
"""Query Oracle for MAX(SYS_DATE).
Returns:
SYS_DATE string or None if query failed.
"""
sql = f"SELECT MAX(SYS_DATE) as SYS_DATE FROM {WIP_VIEW}"
try:
df = read_sql_df(sql)
if df is not None and not df.empty:
sys_date = df.iloc[0]['SYS_DATE']
return str(sys_date) if sys_date else None
return None
except Exception as e:
logger.error(f"Failed to query SYS_DATE: {e}")
return None
def _get_cached_sys_date(self) -> Optional[str]:
"""Get cached SYS_DATE from Redis.
Returns:
Cached SYS_DATE string or None.
"""
client = get_redis_client()
if client is None:
return None
try:
return client.get(get_key("meta:sys_date"))
except Exception as e:
logger.warning(f"Failed to get cached SYS_DATE: {e}")
return None
def _load_full_table(self) -> Optional[pd.DataFrame]:
"""Load entire DW_MES_LOT_V table from Oracle.
Returns:
DataFrame with all rows, or None if failed.
"""
sql = f"""
SELECT *
FROM {WIP_VIEW}
WHERE WORKORDER IS NOT NULL
"""
try:
df = read_sql_df(sql)
return df
except Exception as e:
logger.error(f"Failed to load full table: {e}")
return None
def _update_redis_cache(self, df: pd.DataFrame, sys_date: str) -> bool:
"""Update Redis cache with new data using pipeline for atomicity.
Args:
df: DataFrame with full table data.
sys_date: Current SYS_DATE from Oracle.
Returns:
True if update was successful.
"""
client = get_redis_client()
if client is None:
return False
try:
# Convert DataFrame to JSON
# Handle datetime columns
for col in df.select_dtypes(include=['datetime64']).columns:
df[col] = df[col].astype(str)
data_json = df.to_json(orient='records', force_ascii=False)
# Atomic update using pipeline
now = datetime.now().isoformat()
pipe = client.pipeline()
pipe.set(get_key("data"), data_json)
pipe.set(get_key("meta:sys_date"), sys_date)
pipe.set(get_key("meta:updated_at"), now)
pipe.execute()
return True
except Exception as e:
logger.error(f"Failed to update Redis cache: {e}")
return False
def _check_resource_update(self, force: bool = False) -> bool:
"""Check and update resource cache if needed.
Args:
force: If True, update regardless of interval.
Returns:
True if cache was updated.
"""
from mes_dashboard.services.resource_cache import (
refresh_cache as refresh_resource_cache,
RESOURCE_CACHE_ENABLED,
)
if not RESOURCE_CACHE_ENABLED:
return False
# Check if sync is needed based on interval
now = time.time()
if not force and self._last_resource_sync is not None:
elapsed = now - self._last_resource_sync
if elapsed < self.resource_sync_interval:
logger.debug(
f"Resource sync not due yet ({elapsed:.0f}s < {self.resource_sync_interval}s)"
)
return False
# Perform sync
logger.info("Checking resource cache for updates...")
try:
updated = refresh_resource_cache(force=force)
self._last_resource_sync = now
return updated
except Exception as e:
logger.error(f"Resource cache update failed: {e}", exc_info=True)
return False
# ============================================================
# Global Instance
# ============================================================
_CACHE_UPDATER: Optional[CacheUpdater] = None
def get_cache_updater() -> CacheUpdater:
"""Get or create the global cache updater instance."""
global _CACHE_UPDATER
if _CACHE_UPDATER is None:
_CACHE_UPDATER = CacheUpdater()
return _CACHE_UPDATER
def start_cache_updater() -> None:
"""Start the global cache updater."""
get_cache_updater().start()
def stop_cache_updater() -> None:
"""Stop the global cache updater."""
if _CACHE_UPDATER is not None:
_CACHE_UPDATER.stop()
# -*- coding: utf-8 -*-
"""Background task for updating WIP and Resource cache from Oracle to Redis."""
from __future__ import annotations
import json
import logging
import os
import threading
import time
from datetime import datetime
from typing import Optional
import pandas as pd
from mes_dashboard.core.redis_client import (
get_redis_client,
get_key,
redis_available,
REDIS_ENABLED
)
from mes_dashboard.core.database import read_sql_df
logger = logging.getLogger('mes_dashboard.cache_updater')
# ============================================================
# Configuration
# ============================================================
CACHE_CHECK_INTERVAL = int(os.getenv('CACHE_CHECK_INTERVAL', '600')) # 10 minutes
WIP_VIEW = "DWH.DW_MES_LOT_V"
# Resource cache sync interval (default: 4 hours)
RESOURCE_SYNC_INTERVAL = int(os.getenv('RESOURCE_SYNC_INTERVAL', '14400'))
# ============================================================
# Cache Updater Class
# ============================================================
class CacheUpdater:
"""Background task that periodically checks SYS_DATE and updates cache."""
def __init__(self, interval: int = CACHE_CHECK_INTERVAL):
"""Initialize cache updater.
Args:
interval: Check interval in seconds (default: 600)
"""
self.interval = interval
self.resource_sync_interval = RESOURCE_SYNC_INTERVAL
self._stop_event = threading.Event()
self._thread: Optional[threading.Thread] = None
self._is_running = False
self._last_resource_sync: Optional[float] = None
def start(self) -> None:
"""Start the background update thread."""
if not REDIS_ENABLED:
logger.info("Redis is disabled, cache updater will not start")
return
if self._thread is not None and self._thread.is_alive():
logger.warning("Cache updater is already running")
return
self._stop_event.clear()
self._thread = threading.Thread(
target=self._worker,
daemon=True,
name="cache-updater"
)
self._thread.start()
self._is_running = True
logger.info(f"Cache updater started (interval: {self.interval}s)")
def stop(self) -> None:
"""Stop the background update thread."""
if self._thread is None or not self._thread.is_alive():
return
self._stop_event.set()
self._thread.join(timeout=5)
self._is_running = False
logger.info("Cache updater stopped")
def is_running(self) -> bool:
"""Check if the updater is running."""
return self._is_running and self._thread is not None and self._thread.is_alive()
def force_update(self) -> bool:
"""Force an immediate cache update.
Returns:
True if update was successful.
"""
return self._check_and_update(force=True)
def _worker(self) -> None:
"""Background worker that runs the update loop."""
# Initial update on startup
logger.info("Performing initial cache load...")
self._check_and_update(force=True)
# Initial resource cache load
self._check_resource_update(force=True)
# Periodic updates
while not self._stop_event.wait(self.interval):
try:
self._check_and_update()
self._check_resource_update()
except Exception as e:
logger.error(f"Cache update failed: {e}", exc_info=True)
def _check_and_update(self, force: bool = False) -> bool:
"""Check SYS_DATE and update cache if needed.
Args:
force: If True, update regardless of SYS_DATE.
Returns:
True if cache was updated.
"""
if not redis_available():
logger.warning("Redis not available, skipping cache update")
return False
try:
# Get current SYS_DATE from Oracle
oracle_sys_date = self._check_sys_date()
if oracle_sys_date is None:
logger.error("Failed to get SYS_DATE from Oracle")
return False
# Get cached SYS_DATE from Redis
cached_sys_date = self._get_cached_sys_date()
# Compare and decide whether to update
if not force and cached_sys_date == oracle_sys_date:
logger.debug(f"SYS_DATE unchanged ({oracle_sys_date}), skipping update")
return False
logger.info(f"SYS_DATE changed: {cached_sys_date} -> {oracle_sys_date}, updating cache...")
# Load full table and update Redis
df = self._load_full_table()
if df is None or df.empty:
logger.error("Failed to load data from Oracle")
return False
success = self._update_redis_cache(df, oracle_sys_date)
if success:
logger.info(f"Cache updated successfully ({len(df)} rows)")
return success
except Exception as e:
logger.error(f"Error in cache update: {e}", exc_info=True)
return False
def _check_sys_date(self) -> Optional[str]:
"""Query Oracle for MAX(SYS_DATE).
Returns:
SYS_DATE string or None if query failed.
"""
sql = f"SELECT MAX(SYS_DATE) as SYS_DATE FROM {WIP_VIEW}"
try:
df = read_sql_df(sql)
if df is not None and not df.empty:
sys_date = df.iloc[0]['SYS_DATE']
return str(sys_date) if sys_date else None
return None
except Exception as e:
logger.error(f"Failed to query SYS_DATE: {e}")
return None
def _get_cached_sys_date(self) -> Optional[str]:
"""Get cached SYS_DATE from Redis.
Returns:
Cached SYS_DATE string or None.
"""
client = get_redis_client()
if client is None:
return None
try:
return client.get(get_key("meta:sys_date"))
except Exception as e:
logger.warning(f"Failed to get cached SYS_DATE: {e}")
return None
def _load_full_table(self) -> Optional[pd.DataFrame]:
"""Load entire DWH.DW_MES_LOT_V table from Oracle.
Returns:
DataFrame with all rows, or None if failed.
"""
sql = f"""
SELECT *
FROM {WIP_VIEW}
WHERE WORKORDER IS NOT NULL
"""
try:
df = read_sql_df(sql)
return df
except Exception as e:
logger.error(f"Failed to load full table: {e}")
return None
def _update_redis_cache(self, df: pd.DataFrame, sys_date: str) -> bool:
"""Update Redis cache with new data using pipeline for atomicity.
Args:
df: DataFrame with full table data.
sys_date: Current SYS_DATE from Oracle.
Returns:
True if update was successful.
"""
client = get_redis_client()
if client is None:
return False
try:
# Convert DataFrame to JSON
# Handle datetime columns
for col in df.select_dtypes(include=['datetime64']).columns:
df[col] = df[col].astype(str)
data_json = df.to_json(orient='records', force_ascii=False)
# Atomic update using pipeline
now = datetime.now().isoformat()
pipe = client.pipeline()
pipe.set(get_key("data"), data_json)
pipe.set(get_key("meta:sys_date"), sys_date)
pipe.set(get_key("meta:updated_at"), now)
pipe.execute()
return True
except Exception as e:
logger.error(f"Failed to update Redis cache: {e}")
return False
def _check_resource_update(self, force: bool = False) -> bool:
"""Check and update resource cache if needed.
Args:
force: If True, update regardless of interval.
Returns:
True if cache was updated.
"""
from mes_dashboard.services.resource_cache import (
refresh_cache as refresh_resource_cache,
RESOURCE_CACHE_ENABLED,
)
if not RESOURCE_CACHE_ENABLED:
return False
# Check if sync is needed based on interval
now = time.time()
if not force and self._last_resource_sync is not None:
elapsed = now - self._last_resource_sync
if elapsed < self.resource_sync_interval:
logger.debug(
f"Resource sync not due yet ({elapsed:.0f}s < {self.resource_sync_interval}s)"
)
return False
# Perform sync
logger.info("Checking resource cache for updates...")
try:
updated = refresh_resource_cache(force=force)
self._last_resource_sync = now
return updated
except Exception as e:
logger.error(f"Resource cache update failed: {e}", exc_info=True)
return False
# ============================================================
# Global Instance
# ============================================================
_CACHE_UPDATER: Optional[CacheUpdater] = None
def get_cache_updater() -> CacheUpdater:
"""Get or create the global cache updater instance."""
global _CACHE_UPDATER
if _CACHE_UPDATER is None:
_CACHE_UPDATER = CacheUpdater()
return _CACHE_UPDATER
def start_cache_updater() -> None:
"""Start the global cache updater."""
get_cache_updater().start()
def stop_cache_updater() -> None:
"""Stop the global cache updater."""
if _CACHE_UPDATER is not None:
_CACHE_UPDATER.stop()

View File

@@ -1,13 +1,13 @@
# -*- coding: utf-8 -*-
"""API routes for Excel batch query functionality.
Provides endpoints for:
- Excel file upload and parsing
- Column value extraction
- Batch query execution
- CSV export
"""
# -*- coding: utf-8 -*-
"""API routes for Excel batch query functionality.
Provides endpoints for:
- Excel file upload and parsing
- Column value extraction
- Batch query execution
- CSV export
"""
from flask import Blueprint, jsonify, request, Response
from mes_dashboard.config.tables import TABLES_CONFIG
@@ -16,178 +16,178 @@ from mes_dashboard.services.excel_query_service import (
parse_excel,
get_column_unique_values,
execute_batch_query,
generate_csv_content,
)
excel_query_bp = Blueprint('excel_query', __name__, url_prefix='/api/excel-query')
# Store uploaded Excel data in memory (session-based in production)
_uploaded_excel_cache = {}
@excel_query_bp.route('/upload', methods=['POST'])
def upload_excel():
"""Upload and parse Excel file.
Returns column list and preview data.
"""
if 'file' not in request.files:
return jsonify({'error': '未選擇檔案'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': '未選擇檔案'}), 400
# Check file extension
allowed_extensions = {'.xlsx', '.xls'}
import os
ext = os.path.splitext(file.filename)[1].lower()
if ext not in allowed_extensions:
return jsonify({'error': '只支援 .xlsx 或 .xls 檔案'}), 400
# Parse Excel
result = parse_excel(file)
if 'error' in result:
return jsonify(result), 400
# Cache the file content for later use
file.seek(0)
_uploaded_excel_cache['current'] = file.read()
_uploaded_excel_cache['filename'] = file.filename
return jsonify(result)
@excel_query_bp.route('/column-values', methods=['POST'])
def get_column_values():
"""Get unique values from selected Excel column."""
data = request.get_json()
column_name = data.get('column_name')
if not column_name:
return jsonify({'error': '請指定欄位名稱'}), 400
if 'current' not in _uploaded_excel_cache:
return jsonify({'error': '請先上傳 Excel 檔案'}), 400
# Create file-like object from cached content
import io
file_like = io.BytesIO(_uploaded_excel_cache['current'])
result = get_column_unique_values(file_like, column_name)
if 'error' in result:
return jsonify(result), 400
return jsonify(result)
@excel_query_bp.route('/tables', methods=['GET'])
def get_tables():
"""Get available tables for querying."""
tables = []
for category, table_list in TABLES_CONFIG.items():
for table in table_list:
tables.append({
'name': table['name'],
'display_name': table['display_name'],
'category': category
})
return jsonify({'tables': tables})
@excel_query_bp.route('/table-columns', methods=['POST'])
def get_table_cols():
"""Get columns for a specific table."""
data = request.get_json()
table_name = data.get('table_name')
if not table_name:
return jsonify({'error': '請指定資料表名稱'}), 400
columns = get_table_columns(table_name)
if not columns:
return jsonify({'error': f'無法取得資料表 {table_name} 的欄位'}), 400
return jsonify({'columns': columns})
@excel_query_bp.route('/execute', methods=['POST'])
def execute_query():
"""Execute batch query with Excel values.
Expects JSON body:
{
"table_name": "DW_MES_WIP",
"search_column": "LOT_ID",
"return_columns": ["LOT_ID", "SPEC", "QTY"],
"search_values": ["val1", "val2", ...]
}
"""
data = request.get_json()
table_name = data.get('table_name')
search_column = data.get('search_column')
return_columns = data.get('return_columns')
search_values = data.get('search_values')
# Validation
if not table_name:
return jsonify({'error': '請指定資料表'}), 400
if not search_column:
return jsonify({'error': '請指定查詢欄位'}), 400
if not return_columns or not isinstance(return_columns, list):
return jsonify({'error': '請指定回傳欄位'}), 400
if not search_values or not isinstance(search_values, list):
return jsonify({'error': '無查詢值'}), 400
result = execute_batch_query(
table_name=table_name,
search_column=search_column,
return_columns=return_columns,
search_values=search_values
)
if 'error' in result:
return jsonify(result), 400
return jsonify(result)
@excel_query_bp.route('/export-csv', methods=['POST'])
def export_csv():
"""Export query results as CSV file.
Same parameters as /execute endpoint.
"""
data = request.get_json()
table_name = data.get('table_name')
search_column = data.get('search_column')
return_columns = data.get('return_columns')
search_values = data.get('search_values')
# Validation
if not all([table_name, search_column, return_columns, search_values]):
return jsonify({'error': '缺少必要參數'}), 400
result = execute_batch_query(
table_name=table_name,
search_column=search_column,
return_columns=return_columns,
search_values=search_values
)
if 'error' in result:
return jsonify(result), 400
# Generate CSV
csv_content = generate_csv_content(result['data'], result['columns'])
return Response(
csv_content,
mimetype='text/csv; charset=utf-8',
headers={
'Content-Disposition': 'attachment; filename=query_result.csv'
}
)
generate_csv_content,
)
excel_query_bp = Blueprint('excel_query', __name__, url_prefix='/api/excel-query')
# Store uploaded Excel data in memory (session-based in production)
_uploaded_excel_cache = {}
@excel_query_bp.route('/upload', methods=['POST'])
def upload_excel():
"""Upload and parse Excel file.
Returns column list and preview data.
"""
if 'file' not in request.files:
return jsonify({'error': '未選擇檔案'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': '未選擇檔案'}), 400
# Check file extension
allowed_extensions = {'.xlsx', '.xls'}
import os
ext = os.path.splitext(file.filename)[1].lower()
if ext not in allowed_extensions:
return jsonify({'error': '只支援 .xlsx 或 .xls 檔案'}), 400
# Parse Excel
result = parse_excel(file)
if 'error' in result:
return jsonify(result), 400
# Cache the file content for later use
file.seek(0)
_uploaded_excel_cache['current'] = file.read()
_uploaded_excel_cache['filename'] = file.filename
return jsonify(result)
@excel_query_bp.route('/column-values', methods=['POST'])
def get_column_values():
"""Get unique values from selected Excel column."""
data = request.get_json()
column_name = data.get('column_name')
if not column_name:
return jsonify({'error': '請指定欄位名稱'}), 400
if 'current' not in _uploaded_excel_cache:
return jsonify({'error': '請先上傳 Excel 檔案'}), 400
# Create file-like object from cached content
import io
file_like = io.BytesIO(_uploaded_excel_cache['current'])
result = get_column_unique_values(file_like, column_name)
if 'error' in result:
return jsonify(result), 400
return jsonify(result)
@excel_query_bp.route('/tables', methods=['GET'])
def get_tables():
"""Get available tables for querying."""
tables = []
for category, table_list in TABLES_CONFIG.items():
for table in table_list:
tables.append({
'name': table['name'],
'display_name': table['display_name'],
'category': category
})
return jsonify({'tables': tables})
@excel_query_bp.route('/table-columns', methods=['POST'])
def get_table_cols():
"""Get columns for a specific table."""
data = request.get_json()
table_name = data.get('table_name')
if not table_name:
return jsonify({'error': '請指定資料表名稱'}), 400
columns = get_table_columns(table_name)
if not columns:
return jsonify({'error': f'無法取得資料表 {table_name} 的欄位'}), 400
return jsonify({'columns': columns})
@excel_query_bp.route('/execute', methods=['POST'])
def execute_query():
"""Execute batch query with Excel values.
Expects JSON body:
{
"table_name": "DWH.DW_MES_WIP",
"search_column": "LOT_ID",
"return_columns": ["LOT_ID", "SPEC", "QTY"],
"search_values": ["val1", "val2", ...]
}
"""
data = request.get_json()
table_name = data.get('table_name')
search_column = data.get('search_column')
return_columns = data.get('return_columns')
search_values = data.get('search_values')
# Validation
if not table_name:
return jsonify({'error': '請指定資料表'}), 400
if not search_column:
return jsonify({'error': '請指定查詢欄位'}), 400
if not return_columns or not isinstance(return_columns, list):
return jsonify({'error': '請指定回傳欄位'}), 400
if not search_values or not isinstance(search_values, list):
return jsonify({'error': '無查詢值'}), 400
result = execute_batch_query(
table_name=table_name,
search_column=search_column,
return_columns=return_columns,
search_values=search_values
)
if 'error' in result:
return jsonify(result), 400
return jsonify(result)
@excel_query_bp.route('/export-csv', methods=['POST'])
def export_csv():
"""Export query results as CSV file.
Same parameters as /execute endpoint.
"""
data = request.get_json()
table_name = data.get('table_name')
search_column = data.get('search_column')
return_columns = data.get('return_columns')
search_values = data.get('search_values')
# Validation
if not all([table_name, search_column, return_columns, search_values]):
return jsonify({'error': '缺少必要參數'}), 400
result = execute_batch_query(
table_name=table_name,
search_column=search_column,
return_columns=return_columns,
search_values=search_values
)
if 'error' in result:
return jsonify(result), 400
# Generate CSV
csv_content = generate_csv_content(result['data'], result['columns'])
return Response(
csv_content,
mimetype='text/csv; charset=utf-8',
headers={
'Content-Disposition': 'attachment; filename=query_result.csv'
}
)

View File

@@ -1,9 +1,9 @@
# -*- coding: utf-8 -*-
"""Resource (Equipment) API routes for MES Dashboard.
Contains Flask Blueprint for resource/equipment-related API endpoints.
"""
# -*- coding: utf-8 -*-
"""Resource (Equipment) API routes for MES Dashboard.
Contains Flask Blueprint for resource/equipment-related API endpoints.
"""
from flask import Blueprint, jsonify, request
from mes_dashboard.core.database import get_db_connection
@@ -13,140 +13,140 @@ from mes_dashboard.services.resource_service import (
query_resource_status_summary,
query_resource_by_status,
query_resource_by_workcenter,
query_resource_detail,
query_resource_workcenter_status_matrix,
query_resource_filter_options,
)
# Create Blueprint
resource_bp = Blueprint('resource', __name__, url_prefix='/api/resource')
@resource_bp.route('/summary')
def api_resource_summary():
"""API: Resource status summary."""
days_back = request.args.get('days_back', 30, type=int)
cache_key = make_cache_key("resource_summary", days_back)
summary = cache_get(cache_key)
if summary is None:
summary = query_resource_status_summary(days_back)
if summary:
cache_set(cache_key, summary)
if summary:
return jsonify({'success': True, 'data': summary})
return jsonify({'success': False, 'error': '查詢失敗'}), 500
@resource_bp.route('/by_status')
def api_resource_by_status():
"""API: Resource count by status."""
days_back = request.args.get('days_back', 30, type=int)
cache_key = make_cache_key("resource_by_status", days_back)
data = cache_get(cache_key)
if data is None:
df = query_resource_by_status(days_back)
if df is not None:
data = df.to_dict(orient='records')
cache_set(cache_key, data)
else:
data = None
if data is not None:
return jsonify({'success': True, 'data': data})
return jsonify({'success': False, 'error': '查詢失敗'}), 500
@resource_bp.route('/by_workcenter')
def api_resource_by_workcenter():
"""API: Resource count by workcenter."""
days_back = request.args.get('days_back', 30, type=int)
cache_key = make_cache_key("resource_by_workcenter", days_back)
data = cache_get(cache_key)
if data is None:
df = query_resource_by_workcenter(days_back)
if df is not None:
data = df.to_dict(orient='records')
cache_set(cache_key, data)
else:
data = None
if data is not None:
return jsonify({'success': True, 'data': data})
return jsonify({'success': False, 'error': '查詢失敗'}), 500
@resource_bp.route('/workcenter_status_matrix')
def api_resource_workcenter_status_matrix():
"""API: Resource count matrix by workcenter and status category."""
days_back = request.args.get('days_back', 30, type=int)
cache_key = make_cache_key("resource_workcenter_matrix", days_back)
data = cache_get(cache_key)
if data is None:
df = query_resource_workcenter_status_matrix(days_back)
if df is not None:
data = df.to_dict(orient='records')
cache_set(cache_key, data)
else:
data = None
if data is not None:
return jsonify({'success': True, 'data': data})
return jsonify({'success': False, 'error': '查詢失敗'}), 500
@resource_bp.route('/detail', methods=['POST'])
def api_resource_detail():
"""API: Resource detail with filters."""
data = request.get_json() or {}
filters = data.get('filters')
limit = data.get('limit', 500)
offset = data.get('offset', 0)
days_back = get_days_back(filters)
df = query_resource_detail(filters, limit, offset, days_back)
if df is not None:
records = df.to_dict(orient='records')
return jsonify({'success': True, 'data': records, 'count': len(records), 'offset': offset})
return jsonify({'success': False, 'error': '查詢失敗'}), 500
@resource_bp.route('/filter_options')
def api_resource_filter_options():
"""API: Get filter options."""
days_back = request.args.get('days_back', 30, type=int)
cache_key = make_cache_key("resource_filter_options", days_back)
options = cache_get(cache_key)
if options is None:
options = query_resource_filter_options(days_back)
if options:
cache_set(cache_key, options)
if options:
return jsonify({'success': True, 'data': options})
return jsonify({'success': False, 'error': '查詢失敗'}), 500
@resource_bp.route('/status_values')
def api_resource_status_values():
"""API: Get all distinct status values with counts (for verification)."""
connection = get_db_connection()
if not connection:
return jsonify({'success': False, 'error': '數據庫連接失敗'}), 500
try:
sql = """
SELECT DISTINCT NEWSTATUSNAME, COUNT(*) as CNT
FROM DW_MES_RESOURCESTATUS
WHERE NEWSTATUSNAME IS NOT NULL
AND LASTSTATUSCHANGEDATE >= SYSDATE - 30
GROUP BY NEWSTATUSNAME
ORDER BY CNT DESC
"""
cursor = connection.cursor()
cursor.execute(sql)
rows = cursor.fetchall()
cursor.close()
connection.close()
data = [{'status': row[0], 'count': row[1]} for row in rows]
return jsonify({'success': True, 'data': data})
except Exception as exc:
if connection:
connection.close()
return jsonify({'success': False, 'error': str(exc)}), 500
query_resource_detail,
query_resource_workcenter_status_matrix,
query_resource_filter_options,
)
# Create Blueprint
resource_bp = Blueprint('resource', __name__, url_prefix='/api/resource')
@resource_bp.route('/summary')
def api_resource_summary():
"""API: Resource status summary."""
days_back = request.args.get('days_back', 30, type=int)
cache_key = make_cache_key("resource_summary", days_back)
summary = cache_get(cache_key)
if summary is None:
summary = query_resource_status_summary(days_back)
if summary:
cache_set(cache_key, summary)
if summary:
return jsonify({'success': True, 'data': summary})
return jsonify({'success': False, 'error': '查詢失敗'}), 500
@resource_bp.route('/by_status')
def api_resource_by_status():
"""API: Resource count by status."""
days_back = request.args.get('days_back', 30, type=int)
cache_key = make_cache_key("resource_by_status", days_back)
data = cache_get(cache_key)
if data is None:
df = query_resource_by_status(days_back)
if df is not None:
data = df.to_dict(orient='records')
cache_set(cache_key, data)
else:
data = None
if data is not None:
return jsonify({'success': True, 'data': data})
return jsonify({'success': False, 'error': '查詢失敗'}), 500
@resource_bp.route('/by_workcenter')
def api_resource_by_workcenter():
"""API: Resource count by workcenter."""
days_back = request.args.get('days_back', 30, type=int)
cache_key = make_cache_key("resource_by_workcenter", days_back)
data = cache_get(cache_key)
if data is None:
df = query_resource_by_workcenter(days_back)
if df is not None:
data = df.to_dict(orient='records')
cache_set(cache_key, data)
else:
data = None
if data is not None:
return jsonify({'success': True, 'data': data})
return jsonify({'success': False, 'error': '查詢失敗'}), 500
@resource_bp.route('/workcenter_status_matrix')
def api_resource_workcenter_status_matrix():
"""API: Resource count matrix by workcenter and status category."""
days_back = request.args.get('days_back', 30, type=int)
cache_key = make_cache_key("resource_workcenter_matrix", days_back)
data = cache_get(cache_key)
if data is None:
df = query_resource_workcenter_status_matrix(days_back)
if df is not None:
data = df.to_dict(orient='records')
cache_set(cache_key, data)
else:
data = None
if data is not None:
return jsonify({'success': True, 'data': data})
return jsonify({'success': False, 'error': '查詢失敗'}), 500
@resource_bp.route('/detail', methods=['POST'])
def api_resource_detail():
"""API: Resource detail with filters."""
data = request.get_json() or {}
filters = data.get('filters')
limit = data.get('limit', 500)
offset = data.get('offset', 0)
days_back = get_days_back(filters)
df = query_resource_detail(filters, limit, offset, days_back)
if df is not None:
records = df.to_dict(orient='records')
return jsonify({'success': True, 'data': records, 'count': len(records), 'offset': offset})
return jsonify({'success': False, 'error': '查詢失敗'}), 500
@resource_bp.route('/filter_options')
def api_resource_filter_options():
"""API: Get filter options."""
days_back = request.args.get('days_back', 30, type=int)
cache_key = make_cache_key("resource_filter_options", days_back)
options = cache_get(cache_key)
if options is None:
options = query_resource_filter_options(days_back)
if options:
cache_set(cache_key, options)
if options:
return jsonify({'success': True, 'data': options})
return jsonify({'success': False, 'error': '查詢失敗'}), 500
@resource_bp.route('/status_values')
def api_resource_status_values():
"""API: Get all distinct status values with counts (for verification)."""
connection = get_db_connection()
if not connection:
return jsonify({'success': False, 'error': '數據庫連接失敗'}), 500
try:
sql = """
SELECT DISTINCT NEWSTATUSNAME, COUNT(*) as CNT
FROM DWH.DW_MES_RESOURCESTATUS
WHERE NEWSTATUSNAME IS NOT NULL
AND LASTSTATUSCHANGEDATE >= SYSDATE - 30
GROUP BY NEWSTATUSNAME
ORDER BY CNT DESC
"""
cursor = connection.cursor()
cursor.execute(sql)
rows = cursor.fetchall()
cursor.close()
connection.close()
data = [{'status': row[0], 'count': row[1]} for row in rows]
return jsonify({'success': True, 'data': data})
except Exception as exc:
if connection:
connection.close()
return jsonify({'success': False, 'error': str(exc)}), 500

View File

@@ -1,279 +1,279 @@
# -*- coding: utf-8 -*-
"""WIP API routes for MES Dashboard.
Contains Flask Blueprint for WIP-related API endpoints.
Uses DW_MES_LOT_V view for real-time WIP data.
"""
from flask import Blueprint, jsonify, request
from mes_dashboard.services.wip_service import (
get_wip_summary,
get_wip_matrix,
get_wip_hold_summary,
get_wip_detail,
get_workcenters,
get_packages,
search_workorders,
search_lot_ids,
)
# Create Blueprint
wip_bp = Blueprint('wip', __name__, url_prefix='/api/wip')
def _parse_bool(value: str) -> bool:
"""Parse boolean from query string."""
return value.lower() in ('true', '1', 'yes') if value else False
# ============================================================
# Overview APIs
# ============================================================
@wip_bp.route('/overview/summary')
def api_overview_summary():
"""API: Get WIP KPI summary for overview dashboard.
Query Parameters:
workorder: Optional WORKORDER filter (fuzzy match)
lotid: Optional LOTID filter (fuzzy match)
include_dummy: Include DUMMY lots (default: false)
Returns:
JSON with totalLots, totalQtyPcs, byWipStatus, dataUpdateDate
"""
workorder = request.args.get('workorder', '').strip() or None
lotid = request.args.get('lotid', '').strip() or None
include_dummy = _parse_bool(request.args.get('include_dummy', ''))
result = get_wip_summary(
include_dummy=include_dummy,
workorder=workorder,
lotid=lotid
)
if result is not None:
return jsonify({'success': True, 'data': result})
return jsonify({'success': False, 'error': '查詢失敗'}), 500
@wip_bp.route('/overview/matrix')
def api_overview_matrix():
"""API: Get workcenter x product line matrix for overview dashboard.
Query Parameters:
workorder: Optional WORKORDER filter (fuzzy match)
lotid: Optional LOTID filter (fuzzy match)
include_dummy: Include DUMMY lots (default: false)
status: Optional WIP status filter ('RUN', 'QUEUE', 'HOLD')
hold_type: Optional hold type filter ('quality', 'non-quality')
Only effective when status='HOLD'
Returns:
JSON with workcenters, packages, matrix, workcenter_totals,
package_totals, grand_total
"""
workorder = request.args.get('workorder', '').strip() or None
lotid = request.args.get('lotid', '').strip() or None
include_dummy = _parse_bool(request.args.get('include_dummy', ''))
status = request.args.get('status', '').strip().upper() or None
hold_type = request.args.get('hold_type', '').strip().lower() or None
# Validate status parameter
if status and status not in ('RUN', 'QUEUE', 'HOLD'):
return jsonify({
'success': False,
'error': 'Invalid status. Use RUN, QUEUE, or HOLD'
}), 400
# Validate hold_type parameter
if hold_type and hold_type not in ('quality', 'non-quality'):
return jsonify({
'success': False,
'error': 'Invalid hold_type. Use quality or non-quality'
}), 400
result = get_wip_matrix(
include_dummy=include_dummy,
workorder=workorder,
lotid=lotid,
status=status,
hold_type=hold_type
)
if result is not None:
return jsonify({'success': True, 'data': result})
return jsonify({'success': False, 'error': '查詢失敗'}), 500
@wip_bp.route('/overview/hold')
def api_overview_hold():
"""API: Get hold summary grouped by hold reason.
Query Parameters:
workorder: Optional WORKORDER filter (fuzzy match)
lotid: Optional LOTID filter (fuzzy match)
include_dummy: Include DUMMY lots (default: false)
Returns:
JSON with items list containing reason, lots, qty
"""
workorder = request.args.get('workorder', '').strip() or None
lotid = request.args.get('lotid', '').strip() or None
include_dummy = _parse_bool(request.args.get('include_dummy', ''))
result = get_wip_hold_summary(
include_dummy=include_dummy,
workorder=workorder,
lotid=lotid
)
if result is not None:
return jsonify({'success': True, 'data': result})
return jsonify({'success': False, 'error': '查詢失敗'}), 500
# ============================================================
# Detail APIs
# ============================================================
@wip_bp.route('/detail/<workcenter>')
def api_detail(workcenter: str):
"""API: Get WIP detail for a specific workcenter group.
Args:
workcenter: WORKCENTER_GROUP name (URL path parameter)
Query Parameters:
package: Optional PRODUCTLINENAME filter
status: Optional WIP status filter ('RUN', 'QUEUE', 'HOLD')
hold_type: Optional hold type filter ('quality', 'non-quality')
Only effective when status='HOLD'
workorder: Optional WORKORDER filter (fuzzy match)
lotid: Optional LOTID filter (fuzzy match)
include_dummy: Include DUMMY lots (default: false)
page: Page number (default 1)
page_size: Records per page (default 100, max 500)
Returns:
JSON with workcenter, summary, specs, lots, pagination, sys_date
"""
package = request.args.get('package', '').strip() or None
status = request.args.get('status', '').strip().upper() or None
hold_type = request.args.get('hold_type', '').strip().lower() or None
workorder = request.args.get('workorder', '').strip() or None
lotid = request.args.get('lotid', '').strip() or None
include_dummy = _parse_bool(request.args.get('include_dummy', ''))
page = request.args.get('page', 1, type=int)
page_size = min(request.args.get('page_size', 100, type=int), 500)
if page < 1:
page = 1
# Validate status parameter
if status and status not in ('RUN', 'QUEUE', 'HOLD'):
return jsonify({
'success': False,
'error': 'Invalid status. Use RUN, QUEUE, or HOLD'
}), 400
# Validate hold_type parameter
if hold_type and hold_type not in ('quality', 'non-quality'):
return jsonify({
'success': False,
'error': 'Invalid hold_type. Use quality or non-quality'
}), 400
result = get_wip_detail(
workcenter=workcenter,
package=package,
status=status,
hold_type=hold_type,
workorder=workorder,
lotid=lotid,
include_dummy=include_dummy,
page=page,
page_size=page_size
)
if result is not None:
return jsonify({'success': True, 'data': result})
return jsonify({'success': False, 'error': '查詢失敗'}), 500
# ============================================================
# Meta APIs
# ============================================================
@wip_bp.route('/meta/workcenters')
def api_meta_workcenters():
"""API: Get list of workcenter groups with lot counts.
Query Parameters:
include_dummy: Include DUMMY lots (default: false)
Returns:
JSON with list of {name, lot_count} sorted by sequence
"""
include_dummy = _parse_bool(request.args.get('include_dummy', ''))
result = get_workcenters(include_dummy=include_dummy)
if result is not None:
return jsonify({'success': True, 'data': result})
return jsonify({'success': False, 'error': '查詢失敗'}), 500
@wip_bp.route('/meta/packages')
def api_meta_packages():
"""API: Get list of packages (product lines) with lot counts.
Query Parameters:
include_dummy: Include DUMMY lots (default: false)
Returns:
JSON with list of {name, lot_count} sorted by count desc
"""
include_dummy = _parse_bool(request.args.get('include_dummy', ''))
result = get_packages(include_dummy=include_dummy)
if result is not None:
return jsonify({'success': True, 'data': result})
return jsonify({'success': False, 'error': '查詢失敗'}), 500
@wip_bp.route('/meta/search')
def api_meta_search():
"""API: Search for WORKORDER or LOTID values.
Query Parameters:
type: Search type ('workorder' or 'lotid')
q: Search query (minimum 2 characters)
limit: Maximum results (default: 20, max: 50)
include_dummy: Include DUMMY lots (default: false)
Returns:
JSON with items list containing matching values
"""
search_type = request.args.get('type', '').strip().lower()
q = request.args.get('q', '').strip()
limit = min(request.args.get('limit', 20, type=int), 50)
include_dummy = _parse_bool(request.args.get('include_dummy', ''))
# Validate search type
if search_type not in ('workorder', 'lotid'):
return jsonify({
'success': False,
'error': 'Invalid type. Use "workorder" or "lotid"'
}), 400
# Validate query length
if len(q) < 2:
return jsonify({'success': True, 'data': {'items': []}})
# Perform search
if search_type == 'workorder':
result = search_workorders(q=q, limit=limit, include_dummy=include_dummy)
else:
result = search_lot_ids(q=q, limit=limit, include_dummy=include_dummy)
if result is not None:
return jsonify({'success': True, 'data': {'items': result}})
return jsonify({'success': False, 'error': '查詢失敗'}), 500
# -*- coding: utf-8 -*-
"""WIP API routes for MES Dashboard.
Contains Flask Blueprint for WIP-related API endpoints.
Uses DWH.DW_MES_LOT_V view for real-time WIP data.
"""
from flask import Blueprint, jsonify, request
from mes_dashboard.services.wip_service import (
get_wip_summary,
get_wip_matrix,
get_wip_hold_summary,
get_wip_detail,
get_workcenters,
get_packages,
search_workorders,
search_lot_ids,
)
# Create Blueprint
wip_bp = Blueprint('wip', __name__, url_prefix='/api/wip')
def _parse_bool(value: str) -> bool:
"""Parse boolean from query string."""
return value.lower() in ('true', '1', 'yes') if value else False
# ============================================================
# Overview APIs
# ============================================================
@wip_bp.route('/overview/summary')
def api_overview_summary():
"""API: Get WIP KPI summary for overview dashboard.
Query Parameters:
workorder: Optional WORKORDER filter (fuzzy match)
lotid: Optional LOTID filter (fuzzy match)
include_dummy: Include DUMMY lots (default: false)
Returns:
JSON with totalLots, totalQtyPcs, byWipStatus, dataUpdateDate
"""
workorder = request.args.get('workorder', '').strip() or None
lotid = request.args.get('lotid', '').strip() or None
include_dummy = _parse_bool(request.args.get('include_dummy', ''))
result = get_wip_summary(
include_dummy=include_dummy,
workorder=workorder,
lotid=lotid
)
if result is not None:
return jsonify({'success': True, 'data': result})
return jsonify({'success': False, 'error': '查詢失敗'}), 500
@wip_bp.route('/overview/matrix')
def api_overview_matrix():
"""API: Get workcenter x product line matrix for overview dashboard.
Query Parameters:
workorder: Optional WORKORDER filter (fuzzy match)
lotid: Optional LOTID filter (fuzzy match)
include_dummy: Include DUMMY lots (default: false)
status: Optional WIP status filter ('RUN', 'QUEUE', 'HOLD')
hold_type: Optional hold type filter ('quality', 'non-quality')
Only effective when status='HOLD'
Returns:
JSON with workcenters, packages, matrix, workcenter_totals,
package_totals, grand_total
"""
workorder = request.args.get('workorder', '').strip() or None
lotid = request.args.get('lotid', '').strip() or None
include_dummy = _parse_bool(request.args.get('include_dummy', ''))
status = request.args.get('status', '').strip().upper() or None
hold_type = request.args.get('hold_type', '').strip().lower() or None
# Validate status parameter
if status and status not in ('RUN', 'QUEUE', 'HOLD'):
return jsonify({
'success': False,
'error': 'Invalid status. Use RUN, QUEUE, or HOLD'
}), 400
# Validate hold_type parameter
if hold_type and hold_type not in ('quality', 'non-quality'):
return jsonify({
'success': False,
'error': 'Invalid hold_type. Use quality or non-quality'
}), 400
result = get_wip_matrix(
include_dummy=include_dummy,
workorder=workorder,
lotid=lotid,
status=status,
hold_type=hold_type
)
if result is not None:
return jsonify({'success': True, 'data': result})
return jsonify({'success': False, 'error': '查詢失敗'}), 500
@wip_bp.route('/overview/hold')
def api_overview_hold():
"""API: Get hold summary grouped by hold reason.
Query Parameters:
workorder: Optional WORKORDER filter (fuzzy match)
lotid: Optional LOTID filter (fuzzy match)
include_dummy: Include DUMMY lots (default: false)
Returns:
JSON with items list containing reason, lots, qty
"""
workorder = request.args.get('workorder', '').strip() or None
lotid = request.args.get('lotid', '').strip() or None
include_dummy = _parse_bool(request.args.get('include_dummy', ''))
result = get_wip_hold_summary(
include_dummy=include_dummy,
workorder=workorder,
lotid=lotid
)
if result is not None:
return jsonify({'success': True, 'data': result})
return jsonify({'success': False, 'error': '查詢失敗'}), 500
# ============================================================
# Detail APIs
# ============================================================
@wip_bp.route('/detail/<workcenter>')
def api_detail(workcenter: str):
"""API: Get WIP detail for a specific workcenter group.
Args:
workcenter: WORKCENTER_GROUP name (URL path parameter)
Query Parameters:
package: Optional PRODUCTLINENAME filter
status: Optional WIP status filter ('RUN', 'QUEUE', 'HOLD')
hold_type: Optional hold type filter ('quality', 'non-quality')
Only effective when status='HOLD'
workorder: Optional WORKORDER filter (fuzzy match)
lotid: Optional LOTID filter (fuzzy match)
include_dummy: Include DUMMY lots (default: false)
page: Page number (default 1)
page_size: Records per page (default 100, max 500)
Returns:
JSON with workcenter, summary, specs, lots, pagination, sys_date
"""
package = request.args.get('package', '').strip() or None
status = request.args.get('status', '').strip().upper() or None
hold_type = request.args.get('hold_type', '').strip().lower() or None
workorder = request.args.get('workorder', '').strip() or None
lotid = request.args.get('lotid', '').strip() or None
include_dummy = _parse_bool(request.args.get('include_dummy', ''))
page = request.args.get('page', 1, type=int)
page_size = min(request.args.get('page_size', 100, type=int), 500)
if page < 1:
page = 1
# Validate status parameter
if status and status not in ('RUN', 'QUEUE', 'HOLD'):
return jsonify({
'success': False,
'error': 'Invalid status. Use RUN, QUEUE, or HOLD'
}), 400
# Validate hold_type parameter
if hold_type and hold_type not in ('quality', 'non-quality'):
return jsonify({
'success': False,
'error': 'Invalid hold_type. Use quality or non-quality'
}), 400
result = get_wip_detail(
workcenter=workcenter,
package=package,
status=status,
hold_type=hold_type,
workorder=workorder,
lotid=lotid,
include_dummy=include_dummy,
page=page,
page_size=page_size
)
if result is not None:
return jsonify({'success': True, 'data': result})
return jsonify({'success': False, 'error': '查詢失敗'}), 500
# ============================================================
# Meta APIs
# ============================================================
@wip_bp.route('/meta/workcenters')
def api_meta_workcenters():
"""API: Get list of workcenter groups with lot counts.
Query Parameters:
include_dummy: Include DUMMY lots (default: false)
Returns:
JSON with list of {name, lot_count} sorted by sequence
"""
include_dummy = _parse_bool(request.args.get('include_dummy', ''))
result = get_workcenters(include_dummy=include_dummy)
if result is not None:
return jsonify({'success': True, 'data': result})
return jsonify({'success': False, 'error': '查詢失敗'}), 500
@wip_bp.route('/meta/packages')
def api_meta_packages():
"""API: Get list of packages (product lines) with lot counts.
Query Parameters:
include_dummy: Include DUMMY lots (default: false)
Returns:
JSON with list of {name, lot_count} sorted by count desc
"""
include_dummy = _parse_bool(request.args.get('include_dummy', ''))
result = get_packages(include_dummy=include_dummy)
if result is not None:
return jsonify({'success': True, 'data': result})
return jsonify({'success': False, 'error': '查詢失敗'}), 500
@wip_bp.route('/meta/search')
def api_meta_search():
"""API: Search for WORKORDER or LOTID values.
Query Parameters:
type: Search type ('workorder' or 'lotid')
q: Search query (minimum 2 characters)
limit: Maximum results (default: 20, max: 50)
include_dummy: Include DUMMY lots (default: false)
Returns:
JSON with items list containing matching values
"""
search_type = request.args.get('type', '').strip().lower()
q = request.args.get('q', '').strip()
limit = min(request.args.get('limit', 20, type=int), 50)
include_dummy = _parse_bool(request.args.get('include_dummy', ''))
# Validate search type
if search_type not in ('workorder', 'lotid'):
return jsonify({
'success': False,
'error': 'Invalid type. Use "workorder" or "lotid"'
}), 400
# Validate query length
if len(q) < 2:
return jsonify({'success': True, 'data': {'items': []}})
# Perform search
if search_type == 'workorder':
result = search_workorders(q=q, limit=limit, include_dummy=include_dummy)
else:
result = search_lot_ids(q=q, limit=limit, include_dummy=include_dummy)
if result is not None:
return jsonify({'success': True, 'data': {'items': result}})
return jsonify({'success': False, 'error': '查詢失敗'}), 500

File diff suppressed because it is too large Load Diff

View File

@@ -1,261 +1,261 @@
# -*- coding: utf-8 -*-
"""Cached filter options for MES Dashboard.
Provides cached workcenter groups and resource families for filter dropdowns.
Data is loaded from database and cached in memory with periodic refresh.
"""
import logging
import threading
from datetime import datetime, timedelta
from typing import Optional, Dict, List, Any
from mes_dashboard.core.database import read_sql_df
logger = logging.getLogger('mes_dashboard.filter_cache')
# ============================================================
# Cache Configuration
# ============================================================
CACHE_TTL_SECONDS = 3600 # 1 hour cache TTL
WIP_VIEW = "DW_MES_LOT_V"
# ============================================================
# Cache Storage
# ============================================================
_CACHE = {
'workcenter_groups': None, # List of {name, sequence}
'workcenter_mapping': None, # Dict {workcentername: {group, sequence}}
'last_refresh': None,
'is_loading': False,
}
_CACHE_LOCK = threading.Lock()
# ============================================================
# Workcenter Group Functions
# ============================================================
def get_workcenter_groups(force_refresh: bool = False) -> Optional[List[Dict[str, Any]]]:
"""Get list of workcenter groups with sequence order.
Returns:
List of {name, sequence} sorted by sequence, or None if loading fails.
"""
_ensure_cache_loaded(force_refresh)
return _CACHE.get('workcenter_groups')
def get_workcenter_mapping(force_refresh: bool = False) -> Optional[Dict[str, Dict[str, Any]]]:
"""Get workcenter name to group mapping.
Returns:
Dict mapping workcentername to {group, sequence}, or None if loading fails.
"""
_ensure_cache_loaded(force_refresh)
return _CACHE.get('workcenter_mapping')
def get_workcenters_for_groups(groups: List[str]) -> List[str]:
"""Get list of workcenter names that belong to specified groups.
Args:
groups: List of WORKCENTER_GROUP names
Returns:
List of WORKCENTERNAME values belonging to those groups
"""
mapping = get_workcenter_mapping()
if not mapping:
return []
result = []
for wc_name, info in mapping.items():
if info.get('group') in groups:
result.append(wc_name)
return result
# ============================================================
# Cache Management
# ============================================================
def get_cache_status() -> Dict[str, Any]:
"""Get current cache status.
Returns:
Dict with cache status information
"""
with _CACHE_LOCK:
last_refresh = _CACHE.get('last_refresh')
return {
'loaded': last_refresh is not None,
'last_refresh': last_refresh.isoformat() if last_refresh else None,
'is_loading': _CACHE.get('is_loading', False),
'workcenter_groups_count': len(_CACHE.get('workcenter_groups') or []),
'workcenter_mapping_count': len(_CACHE.get('workcenter_mapping') or {}),
}
def refresh_cache() -> bool:
"""Force refresh the cache.
Returns:
True if refresh succeeded, False otherwise
"""
return _load_cache()
def _ensure_cache_loaded(force_refresh: bool = False):
"""Ensure cache is loaded and not stale."""
with _CACHE_LOCK:
now = datetime.now()
last_refresh = _CACHE.get('last_refresh')
is_loading = _CACHE.get('is_loading', False)
# Check if cache is valid
cache_valid = (
last_refresh is not None and
(now - last_refresh).total_seconds() < CACHE_TTL_SECONDS
)
if cache_valid and not force_refresh:
return
if is_loading:
return # Another thread is loading
# Load cache (outside lock to avoid blocking)
_load_cache()
def _load_cache() -> bool:
"""Load all cache data from database.
Returns:
True if loading succeeded, False otherwise
"""
with _CACHE_LOCK:
if _CACHE.get('is_loading'):
return False
_CACHE['is_loading'] = True
try:
# Load workcenter groups from DW_MES_LOT_V
wc_groups, wc_mapping = _load_workcenter_data()
with _CACHE_LOCK:
_CACHE['workcenter_groups'] = wc_groups
_CACHE['workcenter_mapping'] = wc_mapping
_CACHE['last_refresh'] = datetime.now()
_CACHE['is_loading'] = False
logger.info(
f"Filter cache refreshed: {len(wc_groups or [])} groups, "
f"{len(wc_mapping or {})} workcenters"
)
return True
except Exception as exc:
logger.error(f"Failed to load filter cache: {exc}")
with _CACHE_LOCK:
_CACHE['is_loading'] = False
return False
def _load_workcenter_data():
"""Load workcenter group data from WIP cache (Redis) or fallback to Oracle.
Returns:
Tuple of (groups_list, mapping_dict)
"""
# Try to load from WIP Redis cache first
try:
from mes_dashboard.core.cache import get_cached_wip_data
df = get_cached_wip_data()
if df is not None and not df.empty:
logger.debug("Loading workcenter groups from WIP cache")
return _extract_workcenter_data_from_df(df)
except Exception as exc:
logger.warning(f"Failed to load from WIP cache: {exc}")
# Fallback to Oracle direct query
logger.debug("Falling back to Oracle for workcenter groups")
try:
sql = f"""
SELECT DISTINCT
WORKCENTERNAME,
WORKCENTERID,
WORKCENTER_GROUP,
WORKCENTERSEQUENCE_GROUP
FROM {WIP_VIEW}
WHERE WORKCENTER_GROUP IS NOT NULL
AND WORKCENTERNAME IS NOT NULL
"""
df = read_sql_df(sql)
if df is None or df.empty:
logger.warning("No workcenter data found in DW_MES_LOT_V")
return [], {}
return _extract_workcenter_data_from_df(df)
except Exception as exc:
logger.error(f"Failed to load workcenter data: {exc}")
return [], {}
def _extract_workcenter_data_from_df(df):
"""Extract workcenter groups and mapping from DataFrame.
Args:
df: DataFrame with WORKCENTERNAME, WORKCENTER_GROUP, WORKCENTERSEQUENCE_GROUP columns
Returns:
Tuple of (groups_list, mapping_dict)
"""
# Filter to rows with valid workcenter group
df = df[df['WORKCENTER_GROUP'].notna() & df['WORKCENTERNAME'].notna()]
if df.empty:
return [], {}
# Build groups list (unique groups, take minimum sequence for each group)
groups_df = df.groupby('WORKCENTER_GROUP')['WORKCENTERSEQUENCE_GROUP'].min().reset_index()
groups_df = groups_df.sort_values('WORKCENTERSEQUENCE_GROUP')
groups = []
for _, row in groups_df.iterrows():
groups.append({
'name': row['WORKCENTER_GROUP'],
'sequence': int(row['WORKCENTERSEQUENCE_GROUP'] or 999)
})
# Build mapping dict
mapping = {}
for _, row in df.iterrows():
wc_name = row['WORKCENTERNAME']
mapping[wc_name] = {
'id': row.get('WORKCENTERID'),
'group': row['WORKCENTER_GROUP'],
'sequence': int(row['WORKCENTERSEQUENCE_GROUP'] or 999)
}
return groups, mapping
# ============================================================
# Initialization
# ============================================================
def init_cache():
"""Initialize the cache on application startup.
Should be called during app initialization.
"""
logger.info("Initializing filter cache...")
_load_cache()
# -*- coding: utf-8 -*-
"""Cached filter options for MES Dashboard.
Provides cached workcenter groups and resource families for filter dropdowns.
Data is loaded from database and cached in memory with periodic refresh.
"""
import logging
import threading
from datetime import datetime, timedelta
from typing import Optional, Dict, List, Any
from mes_dashboard.core.database import read_sql_df
logger = logging.getLogger('mes_dashboard.filter_cache')
# ============================================================
# Cache Configuration
# ============================================================
CACHE_TTL_SECONDS = 3600 # 1 hour cache TTL
WIP_VIEW = "DWH.DW_MES_LOT_V"
# ============================================================
# Cache Storage
# ============================================================
_CACHE = {
'workcenter_groups': None, # List of {name, sequence}
'workcenter_mapping': None, # Dict {workcentername: {group, sequence}}
'last_refresh': None,
'is_loading': False,
}
_CACHE_LOCK = threading.Lock()
# ============================================================
# Workcenter Group Functions
# ============================================================
def get_workcenter_groups(force_refresh: bool = False) -> Optional[List[Dict[str, Any]]]:
"""Get list of workcenter groups with sequence order.
Returns:
List of {name, sequence} sorted by sequence, or None if loading fails.
"""
_ensure_cache_loaded(force_refresh)
return _CACHE.get('workcenter_groups')
def get_workcenter_mapping(force_refresh: bool = False) -> Optional[Dict[str, Dict[str, Any]]]:
"""Get workcenter name to group mapping.
Returns:
Dict mapping workcentername to {group, sequence}, or None if loading fails.
"""
_ensure_cache_loaded(force_refresh)
return _CACHE.get('workcenter_mapping')
def get_workcenters_for_groups(groups: List[str]) -> List[str]:
"""Get list of workcenter names that belong to specified groups.
Args:
groups: List of WORKCENTER_GROUP names
Returns:
List of WORKCENTERNAME values belonging to those groups
"""
mapping = get_workcenter_mapping()
if not mapping:
return []
result = []
for wc_name, info in mapping.items():
if info.get('group') in groups:
result.append(wc_name)
return result
# ============================================================
# Cache Management
# ============================================================
def get_cache_status() -> Dict[str, Any]:
"""Get current cache status.
Returns:
Dict with cache status information
"""
with _CACHE_LOCK:
last_refresh = _CACHE.get('last_refresh')
return {
'loaded': last_refresh is not None,
'last_refresh': last_refresh.isoformat() if last_refresh else None,
'is_loading': _CACHE.get('is_loading', False),
'workcenter_groups_count': len(_CACHE.get('workcenter_groups') or []),
'workcenter_mapping_count': len(_CACHE.get('workcenter_mapping') or {}),
}
def refresh_cache() -> bool:
"""Force refresh the cache.
Returns:
True if refresh succeeded, False otherwise
"""
return _load_cache()
def _ensure_cache_loaded(force_refresh: bool = False):
"""Ensure cache is loaded and not stale."""
with _CACHE_LOCK:
now = datetime.now()
last_refresh = _CACHE.get('last_refresh')
is_loading = _CACHE.get('is_loading', False)
# Check if cache is valid
cache_valid = (
last_refresh is not None and
(now - last_refresh).total_seconds() < CACHE_TTL_SECONDS
)
if cache_valid and not force_refresh:
return
if is_loading:
return # Another thread is loading
# Load cache (outside lock to avoid blocking)
_load_cache()
def _load_cache() -> bool:
"""Load all cache data from database.
Returns:
True if loading succeeded, False otherwise
"""
with _CACHE_LOCK:
if _CACHE.get('is_loading'):
return False
_CACHE['is_loading'] = True
try:
# Load workcenter groups from DWH.DW_MES_LOT_V
wc_groups, wc_mapping = _load_workcenter_data()
with _CACHE_LOCK:
_CACHE['workcenter_groups'] = wc_groups
_CACHE['workcenter_mapping'] = wc_mapping
_CACHE['last_refresh'] = datetime.now()
_CACHE['is_loading'] = False
logger.info(
f"Filter cache refreshed: {len(wc_groups or [])} groups, "
f"{len(wc_mapping or {})} workcenters"
)
return True
except Exception as exc:
logger.error(f"Failed to load filter cache: {exc}")
with _CACHE_LOCK:
_CACHE['is_loading'] = False
return False
def _load_workcenter_data():
"""Load workcenter group data from WIP cache (Redis) or fallback to Oracle.
Returns:
Tuple of (groups_list, mapping_dict)
"""
# Try to load from WIP Redis cache first
try:
from mes_dashboard.core.cache import get_cached_wip_data
df = get_cached_wip_data()
if df is not None and not df.empty:
logger.debug("Loading workcenter groups from WIP cache")
return _extract_workcenter_data_from_df(df)
except Exception as exc:
logger.warning(f"Failed to load from WIP cache: {exc}")
# Fallback to Oracle direct query
logger.debug("Falling back to Oracle for workcenter groups")
try:
sql = f"""
SELECT DISTINCT
WORKCENTERNAME,
WORKCENTERID,
WORKCENTER_GROUP,
WORKCENTERSEQUENCE_GROUP
FROM {WIP_VIEW}
WHERE WORKCENTER_GROUP IS NOT NULL
AND WORKCENTERNAME IS NOT NULL
"""
df = read_sql_df(sql)
if df is None or df.empty:
logger.warning("No workcenter data found in DWH.DW_MES_LOT_V")
return [], {}
return _extract_workcenter_data_from_df(df)
except Exception as exc:
logger.error(f"Failed to load workcenter data: {exc}")
return [], {}
def _extract_workcenter_data_from_df(df):
"""Extract workcenter groups and mapping from DataFrame.
Args:
df: DataFrame with WORKCENTERNAME, WORKCENTER_GROUP, WORKCENTERSEQUENCE_GROUP columns
Returns:
Tuple of (groups_list, mapping_dict)
"""
# Filter to rows with valid workcenter group
df = df[df['WORKCENTER_GROUP'].notna() & df['WORKCENTERNAME'].notna()]
if df.empty:
return [], {}
# Build groups list (unique groups, take minimum sequence for each group)
groups_df = df.groupby('WORKCENTER_GROUP')['WORKCENTERSEQUENCE_GROUP'].min().reset_index()
groups_df = groups_df.sort_values('WORKCENTERSEQUENCE_GROUP')
groups = []
for _, row in groups_df.iterrows():
groups.append({
'name': row['WORKCENTER_GROUP'],
'sequence': int(row['WORKCENTERSEQUENCE_GROUP'] or 999)
})
# Build mapping dict
mapping = {}
for _, row in df.iterrows():
wc_name = row['WORKCENTERNAME']
mapping[wc_name] = {
'id': row.get('WORKCENTERID'),
'group': row['WORKCENTER_GROUP'],
'sequence': int(row['WORKCENTERSEQUENCE_GROUP'] or 999)
}
return groups, mapping
# ============================================================
# Initialization
# ============================================================
def init_cache():
"""Initialize the cache on application startup.
Should be called during app initialization.
"""
logger.info("Initializing filter cache...")
_load_cache()

View File

@@ -1,476 +1,476 @@
# -*- coding: utf-8 -*-
"""Resource Cache - DW_MES_RESOURCE 全表快取模組.
全表快取套用全域篩選後的設備主檔資料至 Redis。
提供統一 API 供各模組取用設備資料和篩選器選項。
"""
from __future__ import annotations
import io
import json
import logging
import os
from datetime import datetime
from typing import Any, Dict, List, Optional
import pandas as pd
from mes_dashboard.core.redis_client import (
get_redis_client,
redis_available,
REDIS_ENABLED,
REDIS_KEY_PREFIX,
)
from mes_dashboard.core.database import read_sql_df
from mes_dashboard.config.constants import (
EXCLUDED_LOCATIONS,
EXCLUDED_ASSET_STATUSES,
EQUIPMENT_TYPE_FILTER,
)
logger = logging.getLogger('mes_dashboard.resource_cache')
# ============================================================
# Configuration
# ============================================================
RESOURCE_CACHE_ENABLED = os.getenv('RESOURCE_CACHE_ENABLED', 'true').lower() == 'true'
RESOURCE_SYNC_INTERVAL = int(os.getenv('RESOURCE_SYNC_INTERVAL', '14400')) # 4 hours
# Redis key helpers
def _get_key(key: str) -> str:
"""Get full Redis key with resource prefix."""
return f"{REDIS_KEY_PREFIX}:resource:{key}"
# ============================================================
# Internal: Oracle Load Functions
# ============================================================
def _build_filter_sql() -> str:
"""Build SQL WHERE clause for global filters."""
conditions = [EQUIPMENT_TYPE_FILTER.strip()]
# Location filter
if EXCLUDED_LOCATIONS:
locations_list = ", ".join(f"'{loc}'" for loc in EXCLUDED_LOCATIONS)
conditions.append(
f"(LOCATIONNAME IS NULL OR LOCATIONNAME NOT IN ({locations_list}))"
)
# Asset status filter
if EXCLUDED_ASSET_STATUSES:
status_list = ", ".join(f"'{s}'" for s in EXCLUDED_ASSET_STATUSES)
conditions.append(
f"(PJ_ASSETSSTATUS IS NULL OR PJ_ASSETSSTATUS NOT IN ({status_list}))"
)
return " AND ".join(conditions)
def _load_from_oracle() -> Optional[pd.DataFrame]:
"""從 Oracle 載入全表資料(套用全域篩選).
Returns:
DataFrame with all columns, or None if query failed.
"""
filter_sql = _build_filter_sql()
sql = f"""
SELECT *
FROM DW_MES_RESOURCE
WHERE {filter_sql}
"""
try:
df = read_sql_df(sql)
if df is not None:
logger.info(f"Loaded {len(df)} resources from Oracle")
return df
except Exception as e:
logger.error(f"Failed to load resources from Oracle: {e}")
return None
def _get_version_from_oracle() -> Optional[str]:
"""取得 Oracle 資料版本MAX(LASTCHANGEDATE).
Returns:
Version string (ISO format), or None if query failed.
"""
filter_sql = _build_filter_sql()
sql = f"""
SELECT MAX(LASTCHANGEDATE) as VERSION
FROM DW_MES_RESOURCE
WHERE {filter_sql}
"""
try:
df = read_sql_df(sql)
if df is not None and not df.empty:
version = df.iloc[0]['VERSION']
if version is not None:
if hasattr(version, 'isoformat'):
return version.isoformat()
return str(version)
return None
except Exception as e:
logger.error(f"Failed to get version from Oracle: {e}")
return None
# ============================================================
# Internal: Redis Functions
# ============================================================
def _get_version_from_redis() -> Optional[str]:
"""取得 Redis 快取版本.
Returns:
Cached version string, or None.
"""
client = get_redis_client()
if client is None:
return None
try:
return client.get(_get_key("meta:version"))
except Exception as e:
logger.warning(f"Failed to get version from Redis: {e}")
return None
def _sync_to_redis(df: pd.DataFrame, version: str) -> bool:
"""同步至 Redis使用 pipeline 確保原子性).
Args:
df: DataFrame with resource data.
version: Version string (MAX(LASTCHANGEDATE)).
Returns:
True if sync was successful.
"""
client = get_redis_client()
if client is None:
return False
try:
# Convert DataFrame to JSON
# Handle datetime columns
df_copy = df.copy()
for col in df_copy.select_dtypes(include=['datetime64']).columns:
df_copy[col] = df_copy[col].astype(str)
data_json = df_copy.to_json(orient='records', force_ascii=False)
# Atomic update using pipeline
now = datetime.now().isoformat()
pipe = client.pipeline()
pipe.set(_get_key("data"), data_json)
pipe.set(_get_key("meta:version"), version)
pipe.set(_get_key("meta:updated"), now)
pipe.set(_get_key("meta:count"), str(len(df)))
pipe.execute()
logger.info(f"Resource cache synced: {len(df)} rows, version={version}")
return True
except Exception as e:
logger.error(f"Failed to sync to Redis: {e}")
return False
def _get_cached_data() -> Optional[pd.DataFrame]:
"""Get cached resource data from Redis.
Returns:
DataFrame with resource data, or None if cache miss.
"""
if not REDIS_ENABLED or not RESOURCE_CACHE_ENABLED:
return None
client = get_redis_client()
if client is None:
return None
try:
data_json = client.get(_get_key("data"))
if data_json is None:
logger.debug("Resource cache miss: no data in Redis")
return None
df = pd.read_json(io.StringIO(data_json), orient='records')
logger.debug(f"Resource cache hit: loaded {len(df)} rows from Redis")
return df
except Exception as e:
logger.warning(f"Failed to read resource cache: {e}")
return None
# ============================================================
# Cache Management API
# ============================================================
def refresh_cache(force: bool = False) -> bool:
"""手動刷新快取.
Args:
force: 強制刷新,忽略版本檢查.
Returns:
True if cache was refreshed.
"""
if not REDIS_ENABLED or not RESOURCE_CACHE_ENABLED:
logger.info("Resource cache is disabled")
return False
if not redis_available():
logger.warning("Redis not available, cannot refresh resource cache")
return False
try:
# Get versions
oracle_version = _get_version_from_oracle()
if oracle_version is None:
logger.error("Failed to get version from Oracle")
return False
redis_version = _get_version_from_redis()
# Check if update needed
if not force and redis_version == oracle_version:
logger.debug(f"Resource cache version unchanged ({oracle_version}), skipping")
return False
logger.info(f"Resource cache version changed: {redis_version} -> {oracle_version}")
# Load and sync
df = _load_from_oracle()
if df is None or df.empty:
logger.error("Failed to load resources from Oracle")
return False
return _sync_to_redis(df, oracle_version)
except Exception as e:
logger.error(f"Failed to refresh resource cache: {e}", exc_info=True)
return False
def init_cache() -> None:
"""初始化快取(應用啟動時呼叫)."""
if not REDIS_ENABLED or not RESOURCE_CACHE_ENABLED:
logger.info("Resource cache is disabled, skipping init")
return
if not redis_available():
logger.warning("Redis not available during resource cache init")
return
# Check if cache exists
client = get_redis_client()
if client is None:
return
try:
exists = client.exists(_get_key("data"))
if not exists:
logger.info("Resource cache empty, performing initial load...")
refresh_cache(force=True)
else:
logger.info("Resource cache already populated")
except Exception as e:
logger.error(f"Failed to init resource cache: {e}")
def get_cache_status() -> Dict[str, Any]:
"""取得快取狀態資訊.
Returns:
Dict with cache status.
"""
status = {
'enabled': REDIS_ENABLED and RESOURCE_CACHE_ENABLED,
'loaded': False,
'count': 0,
'version': None,
'updated_at': None,
}
if not status['enabled']:
return status
client = get_redis_client()
if client is None:
return status
try:
status['loaded'] = client.exists(_get_key("data")) > 0
if status['loaded']:
count_str = client.get(_get_key("meta:count"))
status['count'] = int(count_str) if count_str else 0
status['version'] = client.get(_get_key("meta:version"))
status['updated_at'] = client.get(_get_key("meta:updated"))
except Exception as e:
logger.warning(f"Failed to get resource cache status: {e}")
return status
# ============================================================
# Query API
# ============================================================
def get_all_resources() -> List[Dict]:
"""取得所有快取中的設備資料(全欄位).
Falls back to Oracle if cache unavailable.
Returns:
List of resource dicts.
"""
# Try cache first
df = _get_cached_data()
if df is not None:
return df.to_dict(orient='records')
# Fallback to Oracle
logger.info("Resource cache miss, falling back to Oracle")
df = _load_from_oracle()
if df is not None:
return df.to_dict(orient='records')
return []
def get_resource_by_id(resource_id: str) -> Optional[Dict]:
"""依 RESOURCEID 取得單筆設備資料.
Args:
resource_id: The RESOURCEID to look up.
Returns:
Resource dict, or None if not found.
"""
resources = get_all_resources()
for r in resources:
if r.get('RESOURCEID') == resource_id:
return r
return None
def get_resources_by_ids(resource_ids: List[str]) -> List[Dict]:
"""依 RESOURCEID 清單批次取得設備資料.
Args:
resource_ids: List of RESOURCEIDs to look up.
Returns:
List of matching resource dicts.
"""
id_set = set(resource_ids)
resources = get_all_resources()
return [r for r in resources if r.get('RESOURCEID') in id_set]
def get_resources_by_filter(
workcenters: Optional[List[str]] = None,
families: Optional[List[str]] = None,
departments: Optional[List[str]] = None,
locations: Optional[List[str]] = None,
is_production: Optional[bool] = None,
is_key: Optional[bool] = None,
is_monitor: Optional[bool] = None,
) -> List[Dict]:
"""依條件篩選設備資料(在 Python 端篩選).
Args:
workcenters: Filter by WORKCENTERNAME values.
families: Filter by RESOURCEFAMILYNAME values.
departments: Filter by PJ_DEPARTMENT values.
locations: Filter by LOCATIONNAME values.
is_production: Filter by PJ_ISPRODUCTION flag.
is_key: Filter by PJ_ISKEY flag.
is_monitor: Filter by PJ_ISMONITOR flag.
Returns:
List of matching resource dicts.
"""
resources = get_all_resources()
result = []
for r in resources:
# Apply filters
if workcenters and r.get('WORKCENTERNAME') not in workcenters:
continue
if families and r.get('RESOURCEFAMILYNAME') not in families:
continue
if departments and r.get('PJ_DEPARTMENT') not in departments:
continue
if locations and r.get('LOCATIONNAME') not in locations:
continue
if is_production is not None:
val = r.get('PJ_ISPRODUCTION')
if (val == 1) != is_production:
continue
if is_key is not None:
val = r.get('PJ_ISKEY')
if (val == 1) != is_key:
continue
if is_monitor is not None:
val = r.get('PJ_ISMONITOR')
if (val == 1) != is_monitor:
continue
result.append(r)
return result
# ============================================================
# Distinct Values API (for filters)
# ============================================================
def get_distinct_values(column: str) -> List[str]:
"""取得指定欄位的唯一值清單(排序後).
Args:
column: Column name (e.g., 'RESOURCEFAMILYNAME').
Returns:
Sorted list of unique values (excluding None, NaN, and empty strings).
"""
resources = get_all_resources()
values = set()
for r in resources:
val = r.get(column)
# Skip None, empty strings, and NaN (pandas converts NaN to float)
if val is None or val == '':
continue
# Check for NaN (float type and is NaN)
if isinstance(val, float) and pd.isna(val):
continue
values.add(str(val) if not isinstance(val, str) else val)
return sorted(values)
def get_resource_families() -> List[str]:
"""取得型號清單(便捷方法)."""
return get_distinct_values('RESOURCEFAMILYNAME')
def get_workcenters() -> List[str]:
"""取得站點清單(便捷方法)."""
return get_distinct_values('WORKCENTERNAME')
def get_departments() -> List[str]:
"""取得部門清單(便捷方法)."""
return get_distinct_values('PJ_DEPARTMENT')
def get_locations() -> List[str]:
"""取得區域清單(便捷方法)."""
return get_distinct_values('LOCATIONNAME')
def get_vendors() -> List[str]:
"""取得供應商清單(便捷方法)."""
return get_distinct_values('VENDORNAME')
# -*- coding: utf-8 -*-
"""Resource Cache - DWH.DW_MES_RESOURCE 全表快取模組.
全表快取套用全域篩選後的設備主檔資料至 Redis。
提供統一 API 供各模組取用設備資料和篩選器選項。
"""
from __future__ import annotations
import io
import json
import logging
import os
from datetime import datetime
from typing import Any, Dict, List, Optional
import pandas as pd
from mes_dashboard.core.redis_client import (
get_redis_client,
redis_available,
REDIS_ENABLED,
REDIS_KEY_PREFIX,
)
from mes_dashboard.core.database import read_sql_df
from mes_dashboard.config.constants import (
EXCLUDED_LOCATIONS,
EXCLUDED_ASSET_STATUSES,
EQUIPMENT_TYPE_FILTER,
)
logger = logging.getLogger('mes_dashboard.resource_cache')
# ============================================================
# Configuration
# ============================================================
RESOURCE_CACHE_ENABLED = os.getenv('RESOURCE_CACHE_ENABLED', 'true').lower() == 'true'
RESOURCE_SYNC_INTERVAL = int(os.getenv('RESOURCE_SYNC_INTERVAL', '14400')) # 4 hours
# Redis key helpers
def _get_key(key: str) -> str:
"""Get full Redis key with resource prefix."""
return f"{REDIS_KEY_PREFIX}:resource:{key}"
# ============================================================
# Internal: Oracle Load Functions
# ============================================================
def _build_filter_sql() -> str:
"""Build SQL WHERE clause for global filters."""
conditions = [EQUIPMENT_TYPE_FILTER.strip()]
# Location filter
if EXCLUDED_LOCATIONS:
locations_list = ", ".join(f"'{loc}'" for loc in EXCLUDED_LOCATIONS)
conditions.append(
f"(LOCATIONNAME IS NULL OR LOCATIONNAME NOT IN ({locations_list}))"
)
# Asset status filter
if EXCLUDED_ASSET_STATUSES:
status_list = ", ".join(f"'{s}'" for s in EXCLUDED_ASSET_STATUSES)
conditions.append(
f"(PJ_ASSETSSTATUS IS NULL OR PJ_ASSETSSTATUS NOT IN ({status_list}))"
)
return " AND ".join(conditions)
def _load_from_oracle() -> Optional[pd.DataFrame]:
"""從 Oracle 載入全表資料(套用全域篩選).
Returns:
DataFrame with all columns, or None if query failed.
"""
filter_sql = _build_filter_sql()
sql = f"""
SELECT *
FROM DWH.DW_MES_RESOURCE
WHERE {filter_sql}
"""
try:
df = read_sql_df(sql)
if df is not None:
logger.info(f"Loaded {len(df)} resources from Oracle")
return df
except Exception as e:
logger.error(f"Failed to load resources from Oracle: {e}")
return None
def _get_version_from_oracle() -> Optional[str]:
"""取得 Oracle 資料版本MAX(LASTCHANGEDATE).
Returns:
Version string (ISO format), or None if query failed.
"""
filter_sql = _build_filter_sql()
sql = f"""
SELECT MAX(LASTCHANGEDATE) as VERSION
FROM DWH.DW_MES_RESOURCE
WHERE {filter_sql}
"""
try:
df = read_sql_df(sql)
if df is not None and not df.empty:
version = df.iloc[0]['VERSION']
if version is not None:
if hasattr(version, 'isoformat'):
return version.isoformat()
return str(version)
return None
except Exception as e:
logger.error(f"Failed to get version from Oracle: {e}")
return None
# ============================================================
# Internal: Redis Functions
# ============================================================
def _get_version_from_redis() -> Optional[str]:
"""取得 Redis 快取版本.
Returns:
Cached version string, or None.
"""
client = get_redis_client()
if client is None:
return None
try:
return client.get(_get_key("meta:version"))
except Exception as e:
logger.warning(f"Failed to get version from Redis: {e}")
return None
def _sync_to_redis(df: pd.DataFrame, version: str) -> bool:
"""同步至 Redis使用 pipeline 確保原子性).
Args:
df: DataFrame with resource data.
version: Version string (MAX(LASTCHANGEDATE)).
Returns:
True if sync was successful.
"""
client = get_redis_client()
if client is None:
return False
try:
# Convert DataFrame to JSON
# Handle datetime columns
df_copy = df.copy()
for col in df_copy.select_dtypes(include=['datetime64']).columns:
df_copy[col] = df_copy[col].astype(str)
data_json = df_copy.to_json(orient='records', force_ascii=False)
# Atomic update using pipeline
now = datetime.now().isoformat()
pipe = client.pipeline()
pipe.set(_get_key("data"), data_json)
pipe.set(_get_key("meta:version"), version)
pipe.set(_get_key("meta:updated"), now)
pipe.set(_get_key("meta:count"), str(len(df)))
pipe.execute()
logger.info(f"Resource cache synced: {len(df)} rows, version={version}")
return True
except Exception as e:
logger.error(f"Failed to sync to Redis: {e}")
return False
def _get_cached_data() -> Optional[pd.DataFrame]:
"""Get cached resource data from Redis.
Returns:
DataFrame with resource data, or None if cache miss.
"""
if not REDIS_ENABLED or not RESOURCE_CACHE_ENABLED:
return None
client = get_redis_client()
if client is None:
return None
try:
data_json = client.get(_get_key("data"))
if data_json is None:
logger.debug("Resource cache miss: no data in Redis")
return None
df = pd.read_json(io.StringIO(data_json), orient='records')
logger.debug(f"Resource cache hit: loaded {len(df)} rows from Redis")
return df
except Exception as e:
logger.warning(f"Failed to read resource cache: {e}")
return None
# ============================================================
# Cache Management API
# ============================================================
def refresh_cache(force: bool = False) -> bool:
"""手動刷新快取.
Args:
force: 強制刷新,忽略版本檢查.
Returns:
True if cache was refreshed.
"""
if not REDIS_ENABLED or not RESOURCE_CACHE_ENABLED:
logger.info("Resource cache is disabled")
return False
if not redis_available():
logger.warning("Redis not available, cannot refresh resource cache")
return False
try:
# Get versions
oracle_version = _get_version_from_oracle()
if oracle_version is None:
logger.error("Failed to get version from Oracle")
return False
redis_version = _get_version_from_redis()
# Check if update needed
if not force and redis_version == oracle_version:
logger.debug(f"Resource cache version unchanged ({oracle_version}), skipping")
return False
logger.info(f"Resource cache version changed: {redis_version} -> {oracle_version}")
# Load and sync
df = _load_from_oracle()
if df is None or df.empty:
logger.error("Failed to load resources from Oracle")
return False
return _sync_to_redis(df, oracle_version)
except Exception as e:
logger.error(f"Failed to refresh resource cache: {e}", exc_info=True)
return False
def init_cache() -> None:
"""初始化快取(應用啟動時呼叫)."""
if not REDIS_ENABLED or not RESOURCE_CACHE_ENABLED:
logger.info("Resource cache is disabled, skipping init")
return
if not redis_available():
logger.warning("Redis not available during resource cache init")
return
# Check if cache exists
client = get_redis_client()
if client is None:
return
try:
exists = client.exists(_get_key("data"))
if not exists:
logger.info("Resource cache empty, performing initial load...")
refresh_cache(force=True)
else:
logger.info("Resource cache already populated")
except Exception as e:
logger.error(f"Failed to init resource cache: {e}")
def get_cache_status() -> Dict[str, Any]:
"""取得快取狀態資訊.
Returns:
Dict with cache status.
"""
status = {
'enabled': REDIS_ENABLED and RESOURCE_CACHE_ENABLED,
'loaded': False,
'count': 0,
'version': None,
'updated_at': None,
}
if not status['enabled']:
return status
client = get_redis_client()
if client is None:
return status
try:
status['loaded'] = client.exists(_get_key("data")) > 0
if status['loaded']:
count_str = client.get(_get_key("meta:count"))
status['count'] = int(count_str) if count_str else 0
status['version'] = client.get(_get_key("meta:version"))
status['updated_at'] = client.get(_get_key("meta:updated"))
except Exception as e:
logger.warning(f"Failed to get resource cache status: {e}")
return status
# ============================================================
# Query API
# ============================================================
def get_all_resources() -> List[Dict]:
"""取得所有快取中的設備資料(全欄位).
Falls back to Oracle if cache unavailable.
Returns:
List of resource dicts.
"""
# Try cache first
df = _get_cached_data()
if df is not None:
return df.to_dict(orient='records')
# Fallback to Oracle
logger.info("Resource cache miss, falling back to Oracle")
df = _load_from_oracle()
if df is not None:
return df.to_dict(orient='records')
return []
def get_resource_by_id(resource_id: str) -> Optional[Dict]:
"""依 RESOURCEID 取得單筆設備資料.
Args:
resource_id: The RESOURCEID to look up.
Returns:
Resource dict, or None if not found.
"""
resources = get_all_resources()
for r in resources:
if r.get('RESOURCEID') == resource_id:
return r
return None
def get_resources_by_ids(resource_ids: List[str]) -> List[Dict]:
"""依 RESOURCEID 清單批次取得設備資料.
Args:
resource_ids: List of RESOURCEIDs to look up.
Returns:
List of matching resource dicts.
"""
id_set = set(resource_ids)
resources = get_all_resources()
return [r for r in resources if r.get('RESOURCEID') in id_set]
def get_resources_by_filter(
workcenters: Optional[List[str]] = None,
families: Optional[List[str]] = None,
departments: Optional[List[str]] = None,
locations: Optional[List[str]] = None,
is_production: Optional[bool] = None,
is_key: Optional[bool] = None,
is_monitor: Optional[bool] = None,
) -> List[Dict]:
"""依條件篩選設備資料(在 Python 端篩選).
Args:
workcenters: Filter by WORKCENTERNAME values.
families: Filter by RESOURCEFAMILYNAME values.
departments: Filter by PJ_DEPARTMENT values.
locations: Filter by LOCATIONNAME values.
is_production: Filter by PJ_ISPRODUCTION flag.
is_key: Filter by PJ_ISKEY flag.
is_monitor: Filter by PJ_ISMONITOR flag.
Returns:
List of matching resource dicts.
"""
resources = get_all_resources()
result = []
for r in resources:
# Apply filters
if workcenters and r.get('WORKCENTERNAME') not in workcenters:
continue
if families and r.get('RESOURCEFAMILYNAME') not in families:
continue
if departments and r.get('PJ_DEPARTMENT') not in departments:
continue
if locations and r.get('LOCATIONNAME') not in locations:
continue
if is_production is not None:
val = r.get('PJ_ISPRODUCTION')
if (val == 1) != is_production:
continue
if is_key is not None:
val = r.get('PJ_ISKEY')
if (val == 1) != is_key:
continue
if is_monitor is not None:
val = r.get('PJ_ISMONITOR')
if (val == 1) != is_monitor:
continue
result.append(r)
return result
# ============================================================
# Distinct Values API (for filters)
# ============================================================
def get_distinct_values(column: str) -> List[str]:
"""取得指定欄位的唯一值清單(排序後).
Args:
column: Column name (e.g., 'RESOURCEFAMILYNAME').
Returns:
Sorted list of unique values (excluding None, NaN, and empty strings).
"""
resources = get_all_resources()
values = set()
for r in resources:
val = r.get(column)
# Skip None, empty strings, and NaN (pandas converts NaN to float)
if val is None or val == '':
continue
# Check for NaN (float type and is NaN)
if isinstance(val, float) and pd.isna(val):
continue
values.add(str(val) if not isinstance(val, str) else val)
return sorted(values)
def get_resource_families() -> List[str]:
"""取得型號清單(便捷方法)."""
return get_distinct_values('RESOURCEFAMILYNAME')
def get_workcenters() -> List[str]:
"""取得站點清單(便捷方法)."""
return get_distinct_values('WORKCENTERNAME')
def get_departments() -> List[str]:
"""取得部門清單(便捷方法)."""
return get_distinct_values('PJ_DEPARTMENT')
def get_locations() -> List[str]:
"""取得區域清單(便捷方法)."""
return get_distinct_values('LOCATIONNAME')
def get_vendors() -> List[str]:
"""取得供應商清單(便捷方法)."""
return get_distinct_values('VENDORNAME')

File diff suppressed because it is too large Load Diff

View File

@@ -1,403 +1,403 @@
# -*- coding: utf-8 -*-
"""Resource (Equipment) query services for MES Dashboard.
Provides functions to query equipment status from DW_MES_RESOURCE and DW_MES_RESOURCESTATUS tables.
"""
import pandas as pd
from typing import Optional, Dict, List, Any
from mes_dashboard.core.database import get_db_connection, read_sql_df
from mes_dashboard.core.utils import get_days_back, build_equipment_filter_sql
from mes_dashboard.config.constants import (
EXCLUDED_LOCATIONS,
EXCLUDED_ASSET_STATUSES,
DEFAULT_DAYS_BACK,
)
# ============================================================
# Resource Base Subquery
# ============================================================
def get_resource_latest_status_subquery(days_back: int = 30) -> str:
"""Returns subquery to get latest status per resource.
Filter conditions:
- (OBJECTCATEGORY = 'ASSEMBLY' AND OBJECTTYPE = 'ASSEMBLY') OR
(OBJECTCATEGORY = 'WAFERSORT' AND OBJECTTYPE = 'WAFERSORT')
- Excludes specified locations and asset statuses
Uses ROW_NUMBER() for performance.
Only scans recent status changes (default 30 days).
Includes JOBID for SDT/UDT drill-down.
Includes PJ_LOTID from RESOURCE table.
Args:
days_back: Number of days to look back
Returns:
SQL subquery string for latest resource status.
"""
# Build exclusion filters
location_filter = ""
if EXCLUDED_LOCATIONS:
excluded_locations = "', '".join(EXCLUDED_LOCATIONS)
location_filter = f"AND (r.LOCATIONNAME IS NULL OR r.LOCATIONNAME NOT IN ('{excluded_locations}'))"
asset_status_filter = ""
if EXCLUDED_ASSET_STATUSES:
excluded_assets = "', '".join(EXCLUDED_ASSET_STATUSES)
asset_status_filter = f"AND (r.PJ_ASSETSSTATUS IS NULL OR r.PJ_ASSETSSTATUS NOT IN ('{excluded_assets}'))"
return f"""
WITH latest_txn AS (
SELECT MAX(COALESCE(TXNDATE, LASTSTATUSCHANGEDATE)) AS MAX_TXNDATE
FROM DW_MES_RESOURCESTATUS
)
SELECT *
FROM (
SELECT
r.RESOURCEID,
r.RESOURCENAME,
r.OBJECTCATEGORY,
r.OBJECTTYPE,
r.RESOURCEFAMILYNAME,
r.WORKCENTERNAME,
r.LOCATIONNAME,
r.VENDORNAME,
r.VENDORMODEL,
r.PJ_DEPARTMENT,
r.PJ_ASSETSSTATUS,
r.PJ_ISPRODUCTION,
r.PJ_ISKEY,
r.PJ_ISMONITOR,
r.PJ_LOTID,
r.DESCRIPTION,
s.NEWSTATUSNAME,
s.NEWREASONNAME,
s.LASTSTATUSCHANGEDATE,
s.OLDSTATUSNAME,
s.OLDREASONNAME,
s.AVAILABILITY,
s.JOBID,
s.TXNDATE,
ROW_NUMBER() OVER (
PARTITION BY r.RESOURCEID
ORDER BY s.LASTSTATUSCHANGEDATE DESC NULLS LAST,
COALESCE(s.TXNDATE, s.LASTSTATUSCHANGEDATE) DESC
) AS rn
FROM DW_MES_RESOURCE r
JOIN DW_MES_RESOURCESTATUS s ON r.RESOURCEID = s.HISTORYID
CROSS JOIN latest_txn lt
WHERE ((r.OBJECTCATEGORY = 'ASSEMBLY' AND r.OBJECTTYPE = 'ASSEMBLY')
OR (r.OBJECTCATEGORY = 'WAFERSORT' AND r.OBJECTTYPE = 'WAFERSORT'))
AND COALESCE(s.TXNDATE, s.LASTSTATUSCHANGEDATE) >= lt.MAX_TXNDATE - {days_back}
{location_filter}
{asset_status_filter}
)
WHERE rn = 1
"""
# ============================================================
# Resource Summary Queries
# ============================================================
def query_resource_status_summary(days_back: int = 30) -> Optional[Dict]:
"""Query resource status summary statistics.
Args:
days_back: Number of days to look back
Returns:
Dict with summary stats or None if query fails.
"""
connection = get_db_connection()
if not connection:
return None
try:
sql = f"""
SELECT
COUNT(*) as TOTAL_COUNT,
COUNT(DISTINCT WORKCENTERNAME) as WORKCENTER_COUNT,
COUNT(DISTINCT RESOURCEFAMILYNAME) as FAMILY_COUNT,
COUNT(DISTINCT PJ_DEPARTMENT) as DEPT_COUNT
FROM ({get_resource_latest_status_subquery(days_back)}) rs
"""
cursor = connection.cursor()
cursor.execute(sql)
result = cursor.fetchone()
cursor.close()
connection.close()
if not result:
return None
return {
'total_count': result[0] or 0,
'workcenter_count': result[1] or 0,
'family_count': result[2] or 0,
'dept_count': result[3] or 0
}
except Exception as exc:
if connection:
connection.close()
print(f"Resource summary query failed: {exc}")
return None
def query_resource_by_status(days_back: int = 30) -> Optional[pd.DataFrame]:
"""Query resource count grouped by status.
Args:
days_back: Number of days to look back
Returns:
DataFrame with status counts or None if query fails.
"""
try:
sql = f"""
SELECT
NEWSTATUSNAME,
COUNT(*) as COUNT
FROM ({get_resource_latest_status_subquery(days_back)}) rs
WHERE NEWSTATUSNAME IS NOT NULL
GROUP BY NEWSTATUSNAME
ORDER BY COUNT DESC
"""
return read_sql_df(sql)
except Exception as exc:
print(f"Resource by status query failed: {exc}")
return None
def query_resource_by_workcenter(days_back: int = 30) -> Optional[pd.DataFrame]:
"""Query resource count grouped by workcenter and status.
Args:
days_back: Number of days to look back
Returns:
DataFrame with workcenter/status counts or None if query fails.
"""
try:
sql = f"""
SELECT
WORKCENTERNAME,
NEWSTATUSNAME,
COUNT(*) as COUNT
FROM ({get_resource_latest_status_subquery(days_back)}) rs
WHERE WORKCENTERNAME IS NOT NULL
GROUP BY WORKCENTERNAME, NEWSTATUSNAME
ORDER BY WORKCENTERNAME, COUNT DESC
"""
return read_sql_df(sql)
except Exception as exc:
print(f"Resource by workcenter query failed: {exc}")
return None
def query_resource_detail(
filters: Optional[Dict] = None,
limit: int = 500,
offset: int = 0,
days_back: int = 30
) -> Optional[pd.DataFrame]:
"""Query resource detail with optional filters.
Args:
filters: Optional filter values
limit: Maximum rows to return
offset: Offset for pagination
days_back: Number of days to look back
Returns:
DataFrame with resource details or None if query fails.
"""
try:
base_sql = get_resource_latest_status_subquery(days_back)
where_conditions = []
if filters:
if filters.get('workcenter'):
where_conditions.append(f"WORKCENTERNAME = '{filters['workcenter']}'")
if filters.get('status'):
where_conditions.append(f"NEWSTATUSNAME = '{filters['status']}'")
if filters.get('family'):
where_conditions.append(f"RESOURCEFAMILYNAME = '{filters['family']}'")
if filters.get('department'):
where_conditions.append(f"PJ_DEPARTMENT = '{filters['department']}'")
# Equipment flag filters
if filters.get('isProduction') is not None:
where_conditions.append(
f"NVL(PJ_ISPRODUCTION, 0) = {1 if filters['isProduction'] else 0}"
)
if filters.get('isKey') is not None:
where_conditions.append(
f"NVL(PJ_ISKEY, 0) = {1 if filters['isKey'] else 0}"
)
if filters.get('isMonitor') is not None:
where_conditions.append(
f"NVL(PJ_ISMONITOR, 0) = {1 if filters['isMonitor'] else 0}"
)
where_clause = " AND " + " AND ".join(where_conditions) if where_conditions else ""
start_row = offset + 1
end_row = offset + limit
sql = f"""
SELECT * FROM (
SELECT
RESOURCENAME,
WORKCENTERNAME,
RESOURCEFAMILYNAME,
NEWSTATUSNAME,
NEWREASONNAME,
LASTSTATUSCHANGEDATE,
PJ_DEPARTMENT,
VENDORNAME,
VENDORMODEL,
PJ_ASSETSSTATUS,
AVAILABILITY,
PJ_ISPRODUCTION,
PJ_ISKEY,
PJ_ISMONITOR,
ROW_NUMBER() OVER (
ORDER BY LASTSTATUSCHANGEDATE DESC NULLS LAST
) AS rn
FROM ({base_sql}) rs
WHERE 1=1 {where_clause}
) WHERE rn BETWEEN {start_row} AND {end_row}
"""
df = read_sql_df(sql)
# Convert datetime to string
if 'LASTSTATUSCHANGEDATE' in df.columns:
df['LASTSTATUSCHANGEDATE'] = df['LASTSTATUSCHANGEDATE'].apply(
lambda x: x.strftime('%Y-%m-%d %H:%M:%S') if pd.notna(x) else None
)
return df
except Exception as exc:
print(f"Resource detail query failed: {exc}")
return None
def query_resource_workcenter_status_matrix(days_back: int = 30) -> Optional[pd.DataFrame]:
"""Query resource count matrix by workcenter and status category.
Status values in database:
- PRD: Productive
- SBY: Standby
- UDT: Unscheduled Down Time
- SDT: Scheduled Down Time
- EGT: Engineering Time
- NST: Not Scheduled Time
Args:
days_back: Number of days to look back
Returns:
DataFrame with workcenter/status matrix or None if query fails.
"""
try:
sql = f"""
SELECT
WORKCENTERNAME,
CASE NEWSTATUSNAME
WHEN 'PRD' THEN 'PRD'
WHEN 'SBY' THEN 'SBY'
WHEN 'UDT' THEN 'UDT'
WHEN 'SDT' THEN 'SDT'
WHEN 'EGT' THEN 'EGT'
WHEN 'NST' THEN 'NST'
WHEN 'SCRAP' THEN 'SCRAP'
ELSE 'OTHER'
END as STATUS_CATEGORY,
NEWSTATUSNAME,
COUNT(*) as COUNT
FROM ({get_resource_latest_status_subquery(days_back)}) rs
WHERE WORKCENTERNAME IS NOT NULL
GROUP BY WORKCENTERNAME,
CASE NEWSTATUSNAME
WHEN 'PRD' THEN 'PRD'
WHEN 'SBY' THEN 'SBY'
WHEN 'UDT' THEN 'UDT'
WHEN 'SDT' THEN 'SDT'
WHEN 'EGT' THEN 'EGT'
WHEN 'NST' THEN 'NST'
WHEN 'SCRAP' THEN 'SCRAP'
ELSE 'OTHER'
END,
NEWSTATUSNAME
ORDER BY WORKCENTERNAME, STATUS_CATEGORY
"""
return read_sql_df(sql)
except Exception as exc:
print(f"Resource status matrix query failed: {exc}")
return None
def query_resource_filter_options(days_back: int = 30) -> Optional[Dict]:
"""Get available filter options for resource queries.
Uses resource_cache for static resource data (workcenters, families, departments, locations).
Only queries Oracle for dynamic status data.
Args:
days_back: Number of days to look back
Returns:
Dict with filter options or None if query fails.
"""
from mes_dashboard.services.resource_cache import (
get_workcenters,
get_resource_families,
get_departments,
get_locations,
get_distinct_values,
)
try:
# Get static filter options from resource cache
workcenters = get_workcenters()
families = get_resource_families()
departments = get_departments()
locations = get_locations()
assets_statuses = get_distinct_values('PJ_ASSETSSTATUS')
# Query only dynamic status data from Oracle
# Note: Can't wrap CTE in subquery, so use inline approach
sql_statuses = f"""
WITH latest_txn AS (
SELECT MAX(COALESCE(TXNDATE, LASTSTATUSCHANGEDATE)) AS MAX_TXNDATE
FROM DW_MES_RESOURCESTATUS
)
SELECT DISTINCT s.NEWSTATUSNAME
FROM DW_MES_RESOURCE r
JOIN DW_MES_RESOURCESTATUS s ON r.RESOURCEID = s.HISTORYID
CROSS JOIN latest_txn lt
WHERE ((r.OBJECTCATEGORY = 'ASSEMBLY' AND r.OBJECTTYPE = 'ASSEMBLY')
OR (r.OBJECTCATEGORY = 'WAFERSORT' AND r.OBJECTTYPE = 'WAFERSORT'))
AND COALESCE(s.TXNDATE, s.LASTSTATUSCHANGEDATE) >= lt.MAX_TXNDATE - {days_back}
AND s.NEWSTATUSNAME IS NOT NULL
ORDER BY s.NEWSTATUSNAME
"""
status_df = read_sql_df(sql_statuses)
statuses = status_df['NEWSTATUSNAME'].tolist() if status_df is not None else []
return {
'workcenters': workcenters,
'statuses': statuses,
'families': families,
'departments': departments,
'locations': locations,
'assets_statuses': assets_statuses
}
except Exception as exc:
print(f"Resource filter options query failed: {exc}")
import traceback
traceback.print_exc()
return None
# -*- coding: utf-8 -*-
"""Resource (Equipment) query services for MES Dashboard.
Provides functions to query equipment status from DWH.DW_MES_RESOURCE and DWH.DW_MES_RESOURCESTATUS tables.
"""
import pandas as pd
from typing import Optional, Dict, List, Any
from mes_dashboard.core.database import get_db_connection, read_sql_df
from mes_dashboard.core.utils import get_days_back, build_equipment_filter_sql
from mes_dashboard.config.constants import (
EXCLUDED_LOCATIONS,
EXCLUDED_ASSET_STATUSES,
DEFAULT_DAYS_BACK,
)
# ============================================================
# Resource Base Subquery
# ============================================================
def get_resource_latest_status_subquery(days_back: int = 30) -> str:
"""Returns subquery to get latest status per resource.
Filter conditions:
- (OBJECTCATEGORY = 'ASSEMBLY' AND OBJECTTYPE = 'ASSEMBLY') OR
(OBJECTCATEGORY = 'WAFERSORT' AND OBJECTTYPE = 'WAFERSORT')
- Excludes specified locations and asset statuses
Uses ROW_NUMBER() for performance.
Only scans recent status changes (default 30 days).
Includes JOBID for SDT/UDT drill-down.
Includes PJ_LOTID from RESOURCE table.
Args:
days_back: Number of days to look back
Returns:
SQL subquery string for latest resource status.
"""
# Build exclusion filters
location_filter = ""
if EXCLUDED_LOCATIONS:
excluded_locations = "', '".join(EXCLUDED_LOCATIONS)
location_filter = f"AND (r.LOCATIONNAME IS NULL OR r.LOCATIONNAME NOT IN ('{excluded_locations}'))"
asset_status_filter = ""
if EXCLUDED_ASSET_STATUSES:
excluded_assets = "', '".join(EXCLUDED_ASSET_STATUSES)
asset_status_filter = f"AND (r.PJ_ASSETSSTATUS IS NULL OR r.PJ_ASSETSSTATUS NOT IN ('{excluded_assets}'))"
return f"""
WITH latest_txn AS (
SELECT MAX(COALESCE(TXNDATE, LASTSTATUSCHANGEDATE)) AS MAX_TXNDATE
FROM DWH.DW_MES_RESOURCESTATUS
)
SELECT *
FROM (
SELECT
r.RESOURCEID,
r.RESOURCENAME,
r.OBJECTCATEGORY,
r.OBJECTTYPE,
r.RESOURCEFAMILYNAME,
r.WORKCENTERNAME,
r.LOCATIONNAME,
r.VENDORNAME,
r.VENDORMODEL,
r.PJ_DEPARTMENT,
r.PJ_ASSETSSTATUS,
r.PJ_ISPRODUCTION,
r.PJ_ISKEY,
r.PJ_ISMONITOR,
r.PJ_LOTID,
r.DESCRIPTION,
s.NEWSTATUSNAME,
s.NEWREASONNAME,
s.LASTSTATUSCHANGEDATE,
s.OLDSTATUSNAME,
s.OLDREASONNAME,
s.AVAILABILITY,
s.JOBID,
s.TXNDATE,
ROW_NUMBER() OVER (
PARTITION BY r.RESOURCEID
ORDER BY s.LASTSTATUSCHANGEDATE DESC NULLS LAST,
COALESCE(s.TXNDATE, s.LASTSTATUSCHANGEDATE) DESC
) AS rn
FROM DWH.DW_MES_RESOURCE r
JOIN DWH.DW_MES_RESOURCESTATUS s ON r.RESOURCEID = s.HISTORYID
CROSS JOIN latest_txn lt
WHERE ((r.OBJECTCATEGORY = 'ASSEMBLY' AND r.OBJECTTYPE = 'ASSEMBLY')
OR (r.OBJECTCATEGORY = 'WAFERSORT' AND r.OBJECTTYPE = 'WAFERSORT'))
AND COALESCE(s.TXNDATE, s.LASTSTATUSCHANGEDATE) >= lt.MAX_TXNDATE - {days_back}
{location_filter}
{asset_status_filter}
)
WHERE rn = 1
"""
# ============================================================
# Resource Summary Queries
# ============================================================
def query_resource_status_summary(days_back: int = 30) -> Optional[Dict]:
"""Query resource status summary statistics.
Args:
days_back: Number of days to look back
Returns:
Dict with summary stats or None if query fails.
"""
connection = get_db_connection()
if not connection:
return None
try:
sql = f"""
SELECT
COUNT(*) as TOTAL_COUNT,
COUNT(DISTINCT WORKCENTERNAME) as WORKCENTER_COUNT,
COUNT(DISTINCT RESOURCEFAMILYNAME) as FAMILY_COUNT,
COUNT(DISTINCT PJ_DEPARTMENT) as DEPT_COUNT
FROM ({get_resource_latest_status_subquery(days_back)}) rs
"""
cursor = connection.cursor()
cursor.execute(sql)
result = cursor.fetchone()
cursor.close()
connection.close()
if not result:
return None
return {
'total_count': result[0] or 0,
'workcenter_count': result[1] or 0,
'family_count': result[2] or 0,
'dept_count': result[3] or 0
}
except Exception as exc:
if connection:
connection.close()
print(f"Resource summary query failed: {exc}")
return None
def query_resource_by_status(days_back: int = 30) -> Optional[pd.DataFrame]:
"""Query resource count grouped by status.
Args:
days_back: Number of days to look back
Returns:
DataFrame with status counts or None if query fails.
"""
try:
sql = f"""
SELECT
NEWSTATUSNAME,
COUNT(*) as COUNT
FROM ({get_resource_latest_status_subquery(days_back)}) rs
WHERE NEWSTATUSNAME IS NOT NULL
GROUP BY NEWSTATUSNAME
ORDER BY COUNT DESC
"""
return read_sql_df(sql)
except Exception as exc:
print(f"Resource by status query failed: {exc}")
return None
def query_resource_by_workcenter(days_back: int = 30) -> Optional[pd.DataFrame]:
"""Query resource count grouped by workcenter and status.
Args:
days_back: Number of days to look back
Returns:
DataFrame with workcenter/status counts or None if query fails.
"""
try:
sql = f"""
SELECT
WORKCENTERNAME,
NEWSTATUSNAME,
COUNT(*) as COUNT
FROM ({get_resource_latest_status_subquery(days_back)}) rs
WHERE WORKCENTERNAME IS NOT NULL
GROUP BY WORKCENTERNAME, NEWSTATUSNAME
ORDER BY WORKCENTERNAME, COUNT DESC
"""
return read_sql_df(sql)
except Exception as exc:
print(f"Resource by workcenter query failed: {exc}")
return None
def query_resource_detail(
filters: Optional[Dict] = None,
limit: int = 500,
offset: int = 0,
days_back: int = 30
) -> Optional[pd.DataFrame]:
"""Query resource detail with optional filters.
Args:
filters: Optional filter values
limit: Maximum rows to return
offset: Offset for pagination
days_back: Number of days to look back
Returns:
DataFrame with resource details or None if query fails.
"""
try:
base_sql = get_resource_latest_status_subquery(days_back)
where_conditions = []
if filters:
if filters.get('workcenter'):
where_conditions.append(f"WORKCENTERNAME = '{filters['workcenter']}'")
if filters.get('status'):
where_conditions.append(f"NEWSTATUSNAME = '{filters['status']}'")
if filters.get('family'):
where_conditions.append(f"RESOURCEFAMILYNAME = '{filters['family']}'")
if filters.get('department'):
where_conditions.append(f"PJ_DEPARTMENT = '{filters['department']}'")
# Equipment flag filters
if filters.get('isProduction') is not None:
where_conditions.append(
f"NVL(PJ_ISPRODUCTION, 0) = {1 if filters['isProduction'] else 0}"
)
if filters.get('isKey') is not None:
where_conditions.append(
f"NVL(PJ_ISKEY, 0) = {1 if filters['isKey'] else 0}"
)
if filters.get('isMonitor') is not None:
where_conditions.append(
f"NVL(PJ_ISMONITOR, 0) = {1 if filters['isMonitor'] else 0}"
)
where_clause = " AND " + " AND ".join(where_conditions) if where_conditions else ""
start_row = offset + 1
end_row = offset + limit
sql = f"""
SELECT * FROM (
SELECT
RESOURCENAME,
WORKCENTERNAME,
RESOURCEFAMILYNAME,
NEWSTATUSNAME,
NEWREASONNAME,
LASTSTATUSCHANGEDATE,
PJ_DEPARTMENT,
VENDORNAME,
VENDORMODEL,
PJ_ASSETSSTATUS,
AVAILABILITY,
PJ_ISPRODUCTION,
PJ_ISKEY,
PJ_ISMONITOR,
ROW_NUMBER() OVER (
ORDER BY LASTSTATUSCHANGEDATE DESC NULLS LAST
) AS rn
FROM ({base_sql}) rs
WHERE 1=1 {where_clause}
) WHERE rn BETWEEN {start_row} AND {end_row}
"""
df = read_sql_df(sql)
# Convert datetime to string
if 'LASTSTATUSCHANGEDATE' in df.columns:
df['LASTSTATUSCHANGEDATE'] = df['LASTSTATUSCHANGEDATE'].apply(
lambda x: x.strftime('%Y-%m-%d %H:%M:%S') if pd.notna(x) else None
)
return df
except Exception as exc:
print(f"Resource detail query failed: {exc}")
return None
def query_resource_workcenter_status_matrix(days_back: int = 30) -> Optional[pd.DataFrame]:
"""Query resource count matrix by workcenter and status category.
Status values in database:
- PRD: Productive
- SBY: Standby
- UDT: Unscheduled Down Time
- SDT: Scheduled Down Time
- EGT: Engineering Time
- NST: Not Scheduled Time
Args:
days_back: Number of days to look back
Returns:
DataFrame with workcenter/status matrix or None if query fails.
"""
try:
sql = f"""
SELECT
WORKCENTERNAME,
CASE NEWSTATUSNAME
WHEN 'PRD' THEN 'PRD'
WHEN 'SBY' THEN 'SBY'
WHEN 'UDT' THEN 'UDT'
WHEN 'SDT' THEN 'SDT'
WHEN 'EGT' THEN 'EGT'
WHEN 'NST' THEN 'NST'
WHEN 'SCRAP' THEN 'SCRAP'
ELSE 'OTHER'
END as STATUS_CATEGORY,
NEWSTATUSNAME,
COUNT(*) as COUNT
FROM ({get_resource_latest_status_subquery(days_back)}) rs
WHERE WORKCENTERNAME IS NOT NULL
GROUP BY WORKCENTERNAME,
CASE NEWSTATUSNAME
WHEN 'PRD' THEN 'PRD'
WHEN 'SBY' THEN 'SBY'
WHEN 'UDT' THEN 'UDT'
WHEN 'SDT' THEN 'SDT'
WHEN 'EGT' THEN 'EGT'
WHEN 'NST' THEN 'NST'
WHEN 'SCRAP' THEN 'SCRAP'
ELSE 'OTHER'
END,
NEWSTATUSNAME
ORDER BY WORKCENTERNAME, STATUS_CATEGORY
"""
return read_sql_df(sql)
except Exception as exc:
print(f"Resource status matrix query failed: {exc}")
return None
def query_resource_filter_options(days_back: int = 30) -> Optional[Dict]:
"""Get available filter options for resource queries.
Uses resource_cache for static resource data (workcenters, families, departments, locations).
Only queries Oracle for dynamic status data.
Args:
days_back: Number of days to look back
Returns:
Dict with filter options or None if query fails.
"""
from mes_dashboard.services.resource_cache import (
get_workcenters,
get_resource_families,
get_departments,
get_locations,
get_distinct_values,
)
try:
# Get static filter options from resource cache
workcenters = get_workcenters()
families = get_resource_families()
departments = get_departments()
locations = get_locations()
assets_statuses = get_distinct_values('PJ_ASSETSSTATUS')
# Query only dynamic status data from Oracle
# Note: Can't wrap CTE in subquery, so use inline approach
sql_statuses = f"""
WITH latest_txn AS (
SELECT MAX(COALESCE(TXNDATE, LASTSTATUSCHANGEDATE)) AS MAX_TXNDATE
FROM DWH.DW_MES_RESOURCESTATUS
)
SELECT DISTINCT s.NEWSTATUSNAME
FROM DWH.DW_MES_RESOURCE r
JOIN DWH.DW_MES_RESOURCESTATUS s ON r.RESOURCEID = s.HISTORYID
CROSS JOIN latest_txn lt
WHERE ((r.OBJECTCATEGORY = 'ASSEMBLY' AND r.OBJECTTYPE = 'ASSEMBLY')
OR (r.OBJECTCATEGORY = 'WAFERSORT' AND r.OBJECTTYPE = 'WAFERSORT'))
AND COALESCE(s.TXNDATE, s.LASTSTATUSCHANGEDATE) >= lt.MAX_TXNDATE - {days_back}
AND s.NEWSTATUSNAME IS NOT NULL
ORDER BY s.NEWSTATUSNAME
"""
status_df = read_sql_df(sql_statuses)
statuses = status_df['NEWSTATUSNAME'].tolist() if status_df is not None else []
return {
'workcenters': workcenters,
'statuses': statuses,
'families': families,
'departments': departments,
'locations': locations,
'assets_statuses': assets_statuses
}
except Exception as exc:
print(f"Resource filter options query failed: {exc}")
import traceback
traceback.print_exc()
return None

File diff suppressed because it is too large Load Diff