feat: dataset cache for hold/resource history + slow connection migration

Two changes combined:

1. historical-query-slow-connection: Migrate all historical query pages
   to read_sql_df_slow with semaphore concurrency control (max 3),
   raise DB slow timeout to 300s, gunicorn timeout to 360s, and
   unify frontend timeouts to 360s for all historical pages.

2. hold-resource-history-dataset-cache: Convert hold-history and
   resource-history from multi-query to single-query + dataset cache
   pattern (L1 ProcessLevelCache + L2 Redis parquet/base64, TTL=900s).
   Replace old GET endpoints with POST /query + GET /view two-phase
   API. Frontend auto-retries on 410 cache_expired.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
egg
2026-02-25 13:15:02 +08:00
parent cd061e0cfd
commit 71c8102de6
64 changed files with 3806 additions and 1442 deletions

View File

@@ -0,0 +1,553 @@
# -*- coding: utf-8 -*-
"""Two-phase hold-history dataset cache.
Primary query (POST /query) → Oracle → cache full hold/release DataFrame.
Supplementary view (GET /view) → read cache → pandas filter/derive.
Cache layers:
L1: ProcessLevelCache (in-process, per-worker)
L2: Redis (cross-worker, parquet bytes encoded as base64 string)
"""
from __future__ import annotations
import base64
import hashlib
import io
import json
import logging
from functools import lru_cache
from pathlib import Path
from typing import Any, Dict, List, Optional
import pandas as pd
from mes_dashboard.core.cache import ProcessLevelCache, register_process_cache
from mes_dashboard.core.database import read_sql_df_slow as read_sql_df
from mes_dashboard.core.redis_client import (
REDIS_ENABLED,
get_key,
get_redis_client,
)
from mes_dashboard.services.filter_cache import get_workcenter_group as _get_wc_group
from mes_dashboard.services.hold_history_service import (
_clean_text,
_format_datetime,
_safe_float,
_safe_int,
)
from mes_dashboard.sql.filters import CommonFilters
logger = logging.getLogger("mes_dashboard.hold_dataset_cache")
_CACHE_TTL = 900 # 15 minutes
_CACHE_MAX_SIZE = 8
_REDIS_NAMESPACE = "hold_dataset"
_dataset_cache = ProcessLevelCache(ttl_seconds=_CACHE_TTL, max_size=_CACHE_MAX_SIZE)
register_process_cache("hold_dataset", _dataset_cache, "Hold Dataset (L1, 15min)")
_SQL_DIR = Path(__file__).resolve().parent.parent / "sql" / "hold_history"
# ============================================================
# SQL loading
# ============================================================
@lru_cache(maxsize=4)
def _load_sql(name: str) -> str:
path = _SQL_DIR / f"{name}.sql"
sql = path.read_text(encoding="utf-8")
if "{{ NON_QUALITY_REASONS }}" in sql:
sql = sql.replace(
"{{ NON_QUALITY_REASONS }}",
CommonFilters.get_non_quality_reasons_sql(),
)
return sql
# ============================================================
# Query ID
# ============================================================
def _make_query_id(params: dict) -> str:
"""Deterministic hash from primary query params."""
canonical = json.dumps(params, sort_keys=True, ensure_ascii=False, default=str)
return hashlib.sha256(canonical.encode("utf-8")).hexdigest()[:16]
# ============================================================
# Redis L2 helpers (parquet <-> base64 string)
# ============================================================
def _redis_key(query_id: str) -> str:
return get_key(f"{_REDIS_NAMESPACE}:{query_id}")
def _redis_store_df(query_id: str, df: pd.DataFrame) -> None:
if not REDIS_ENABLED:
return
client = get_redis_client()
if client is None:
return
try:
buf = io.BytesIO()
df.to_parquet(buf, engine="pyarrow", index=False)
encoded = base64.b64encode(buf.getvalue()).decode("ascii")
client.setex(_redis_key(query_id), _CACHE_TTL, encoded)
except Exception as exc:
logger.warning("Failed to store DataFrame in Redis: %s", exc)
def _redis_load_df(query_id: str) -> Optional[pd.DataFrame]:
if not REDIS_ENABLED:
return None
client = get_redis_client()
if client is None:
return None
try:
encoded = client.get(_redis_key(query_id))
if encoded is None:
return None
raw = base64.b64decode(encoded)
return pd.read_parquet(io.BytesIO(raw), engine="pyarrow")
except Exception as exc:
logger.warning("Failed to load DataFrame from Redis: %s", exc)
return None
# ============================================================
# Cache read (L1 -> L2 -> None)
# ============================================================
def _get_cached_df(query_id: str) -> Optional[pd.DataFrame]:
"""Read cache: L1 hit -> return, L1 miss -> L2 -> write L1 -> return."""
df = _dataset_cache.get(query_id)
if df is not None:
return df
df = _redis_load_df(query_id)
if df is not None:
_dataset_cache.set(query_id, df)
return df
def _store_df(query_id: str, df: pd.DataFrame) -> None:
"""Write to L1 and L2."""
_dataset_cache.set(query_id, df)
_redis_store_df(query_id, df)
# ============================================================
# Primary query
# ============================================================
def execute_primary_query(
*,
start_date: str,
end_date: str,
hold_type: str = "quality",
record_type: str = "new",
) -> Dict[str, Any]:
"""Execute Oracle query -> cache DataFrame -> return structured result."""
query_id = _make_query_id({"start_date": start_date, "end_date": end_date})
cached_df = _get_cached_df(query_id)
if cached_df is not None:
logger.info("Hold dataset cache hit for query_id=%s", query_id)
else:
logger.info(
"Hold dataset cache miss for query_id=%s, querying Oracle", query_id
)
sql = _load_sql("base_facts")
params = {"start_date": start_date, "end_date": end_date}
df = read_sql_df(sql, params)
if df is None:
df = pd.DataFrame()
if not df.empty:
df["_QUERY_START"] = pd.Timestamp(start_date)
df["_QUERY_END"] = pd.Timestamp(end_date)
_store_df(query_id, df)
cached_df = df
views = _derive_all_views(
cached_df,
hold_type=hold_type,
record_type=record_type,
page=1,
per_page=50,
)
return {"query_id": query_id, **views}
# ============================================================
# View (supplementary filtering on cache)
# ============================================================
def apply_view(
*,
query_id: str,
hold_type: str = "quality",
reason: Optional[str] = None,
record_type: str = "new",
duration_range: Optional[str] = None,
page: int = 1,
per_page: int = 50,
) -> Optional[Dict[str, Any]]:
"""Read cache -> apply filters -> return derived data. Returns None if expired."""
df = _get_cached_df(query_id)
if df is None:
return None
return _derive_all_views(
df,
hold_type=hold_type,
reason=reason,
record_type=record_type,
duration_range=duration_range,
page=page,
per_page=per_page,
)
# ============================================================
# Master derivation
# ============================================================
def _derive_all_views(
df: pd.DataFrame,
*,
hold_type: str = "quality",
reason: Optional[str] = None,
record_type: str = "new",
duration_range: Optional[str] = None,
page: int = 1,
per_page: int = 50,
) -> Dict[str, Any]:
"""Derive trend, reason_pareto, duration, and list from cached DataFrame."""
if df is None or df.empty:
return _empty_views()
# Trend uses full DF (no record_type/reason/duration filter, all hold_types)
trend = _derive_trend(df)
# Apply record_type filter for pareto, duration, list
filtered = _apply_record_type_filter(df, record_type)
# Apply hold_type filter
if hold_type != "all":
ht_value = "non-quality" if hold_type == "non-quality" else "quality"
filtered = filtered[filtered["HOLD_TYPE"] == ht_value]
reason_pareto = _derive_reason_pareto(filtered)
duration = _derive_duration(filtered)
# List: additional reason + duration_range filters
list_df = filtered
if reason:
list_df = list_df[
list_df["HOLDREASONNAME"].str.strip() == reason.strip()
]
if duration_range:
list_df = _apply_duration_range_filter(list_df, duration_range)
detail = _derive_list(list_df, page=page, per_page=per_page)
return {
"trend": trend,
"reason_pareto": reason_pareto,
"duration": duration,
"list": detail,
}
def _empty_views() -> Dict[str, Any]:
return {
"trend": {"days": []},
"reason_pareto": {"items": []},
"duration": {"items": []},
"list": {
"items": [],
"pagination": {
"page": 1,
"perPage": 50,
"total": 0,
"totalPages": 1,
},
},
}
# ============================================================
# Record-type & duration-range filters
# ============================================================
def _apply_record_type_filter(
df: pd.DataFrame, record_type: str
) -> pd.DataFrame:
if df is None or df.empty:
return df
types = {t.strip().lower() for t in str(record_type or "new").split(",")}
if types >= {"new", "on_hold", "released"}:
return df
mask = pd.Series(False, index=df.index)
if "new" in types:
if "_QUERY_START" in df.columns and "_QUERY_END" in df.columns:
qs = df["_QUERY_START"].iloc[0]
qe = df["_QUERY_END"].iloc[0]
mask |= (df["HOLD_DAY"] >= qs) & (df["HOLD_DAY"] <= qe)
else:
mask |= pd.Series(True, index=df.index)
if "on_hold" in types:
mask |= df["RELEASETXNDATE"].isna()
if "released" in types:
mask |= df["RELEASETXNDATE"].notna()
return df[mask]
def _apply_duration_range_filter(
df: pd.DataFrame, duration_range: str
) -> pd.DataFrame:
if df is None or df.empty or not duration_range:
return df
hours = df["HOLD_HOURS"]
if duration_range == "<4h":
return df[hours < 4]
if duration_range == "4-24h":
return df[(hours >= 4) & (hours < 24)]
if duration_range == "1-3d":
return df[(hours >= 24) & (hours < 72)]
if duration_range == ">3d":
return df[hours >= 72]
return df
# ============================================================
# Derivation: Trend
# ============================================================
def _derive_trend(df: pd.DataFrame) -> Dict[str, Any]:
"""Derive daily trend with quality / non_quality / all variants."""
if df is None or df.empty:
return {"days": []}
if "_QUERY_START" in df.columns:
start = pd.Timestamp(df["_QUERY_START"].iloc[0])
end = pd.Timestamp(df["_QUERY_END"].iloc[0])
else:
start = df["HOLD_DAY"].min()
end = df["HOLD_DAY"].max()
dates = pd.date_range(start, end, freq="D")
type_map: List[tuple] = [
("quality", "quality"),
("non_quality", "non-quality"),
("all", None),
]
days: List[Dict[str, Any]] = []
for d in dates:
day_data: Dict[str, Any] = {"date": d.strftime("%Y-%m-%d")}
for key, type_filter in type_map:
tdf = df if type_filter is None else df[df["HOLD_TYPE"] == type_filter]
if tdf.empty:
day_data[key] = _empty_trend_metrics()
continue
# holdQty: distinct containers on hold as of this day
on_hold = (tdf["HOLD_DAY"] <= d) & (
tdf["RELEASE_DAY"].isna() | (tdf["RELEASE_DAY"] > d)
)
hold_qty = int(tdf.loc[on_hold, "CONTAINERID"].nunique())
# newHoldQty: new holds arriving this day (dedup)
new_mask = (tdf["HOLD_DAY"] == d) & (tdf["RN_HOLD_DAY"] == 1)
new_hold_qty = int(new_mask.sum())
# releaseQty: releases on this day
release_qty = int((tdf["RELEASE_DAY"] == d).sum())
# futureHoldQty: future holds on this day
future_mask = (
(tdf["HOLD_DAY"] == d)
& (tdf["IS_FUTURE_HOLD"] == 1)
& (tdf["FUTURE_HOLD_FLAG"] == 1)
)
future_hold_qty = int(future_mask.sum())
day_data[key] = {
"holdQty": hold_qty,
"newHoldQty": new_hold_qty,
"releaseQty": release_qty,
"futureHoldQty": future_hold_qty,
}
days.append(day_data)
return {"days": days}
def _empty_trend_metrics() -> Dict[str, int]:
return {"holdQty": 0, "newHoldQty": 0, "releaseQty": 0, "futureHoldQty": 0}
# ============================================================
# Derivation: Reason Pareto
# ============================================================
def _derive_reason_pareto(df: pd.DataFrame) -> Dict[str, Any]:
"""Group by HOLDREASONNAME -> count, qty, pct, cumPct."""
if df is None or df.empty:
return {"items": []}
grouped = (
df.groupby("HOLDREASONNAME", sort=False)
.agg(count=("CONTAINERID", "count"), qty=("QTY", "sum"))
.reset_index()
)
grouped = grouped.sort_values("count", ascending=False)
total = grouped["count"].sum()
items: List[Dict[str, Any]] = []
cumulative = 0.0
for _, row in grouped.iterrows():
count = _safe_int(row["count"])
qty = _safe_int(row["qty"])
pct = round((count / total * 100) if total > 0 else 0, 2)
cumulative += pct
items.append(
{
"reason": _clean_text(row["HOLDREASONNAME"]) or "(未填寫)",
"count": count,
"qty": qty,
"pct": pct,
"cumPct": round(cumulative, 2),
}
)
return {"items": items}
# ============================================================
# Derivation: Duration
# ============================================================
def _derive_duration(df: pd.DataFrame) -> Dict[str, Any]:
"""Bucket released holds into <4h / 4-24h / 1-3d / >3d."""
if df is None or df.empty:
return {"items": []}
released = df[df["RELEASETXNDATE"].notna()]
if released.empty:
return {"items": []}
hours = released["HOLD_HOURS"]
total = len(released)
buckets = [
("<4h", hours < 4),
("4-24h", (hours >= 4) & (hours < 24)),
("1-3d", (hours >= 24) & (hours < 72)),
(">3d", hours >= 72),
]
items: List[Dict[str, Any]] = []
for label, mask in buckets:
count = int(mask.sum())
qty = _safe_int(released.loc[mask, "QTY"].sum())
pct = round((count / total * 100) if total > 0 else 0, 2)
items.append({"range": label, "count": count, "qty": qty, "pct": pct})
return {"items": items}
# ============================================================
# Derivation: Paginated list
# ============================================================
def _derive_list(
df: pd.DataFrame,
*,
page: int = 1,
per_page: int = 50,
) -> Dict[str, Any]:
"""Sort by HOLDTXNDATE desc and paginate."""
if df is None or df.empty:
return {
"items": [],
"pagination": {
"page": 1,
"perPage": per_page,
"total": 0,
"totalPages": 1,
},
}
page = max(int(page), 1)
per_page = min(max(int(per_page), 1), 200)
sorted_df = df.sort_values("HOLDTXNDATE", ascending=False)
total = len(sorted_df)
total_pages = max((total + per_page - 1) // per_page, 1)
offset = (page - 1) * per_page
page_df = sorted_df.iloc[offset : offset + per_page]
items: List[Dict[str, Any]] = []
for _, row in page_df.iterrows():
wc_name = _clean_text(row.get("WORKCENTERNAME"))
wc_group = _get_wc_group(wc_name) if wc_name else None
items.append(
{
"lotId": _clean_text(row.get("LOT_ID")),
"workorder": _clean_text(row.get("PJ_WORKORDER")),
"product": _clean_text(row.get("PRODUCTNAME")),
"workcenter": wc_group or wc_name,
"holdReason": _clean_text(row.get("HOLDREASONNAME")),
"qty": _safe_int(row.get("QTY")),
"holdDate": _format_datetime(row.get("HOLDTXNDATE")),
"holdEmp": _clean_text(row.get("HOLDEMP")),
"holdComment": _clean_text(row.get("HOLDCOMMENTS")),
"releaseDate": _format_datetime(row.get("RELEASETXNDATE")),
"releaseEmp": _clean_text(row.get("RELEASEEMP")),
"releaseComment": _clean_text(row.get("RELEASECOMMENTS")),
"holdHours": round(_safe_float(row.get("HOLD_HOURS")), 2),
"ncr": _clean_text(row.get("NCRID")),
"futureHoldComment": _clean_text(row.get("FUTUREHOLDCOMMENTS")),
}
)
return {
"items": items,
"pagination": {
"page": page,
"perPage": per_page,
"total": total,
"totalPages": total_pages,
},
}