feat: cross-tool OOM protection — shared memory guard, Redis cache, RSS projection

Extract interactive memory guard from reject_dataset_cache into reusable
core/interactive_memory_guard.py (two-fence: DataFrame size + RSS projection).

Material Trace: add Redis query cache (TTL=5min), FETCH FIRST 50001 row limit,
upgrade memory guard to RSS projection, forced GC after batched queries.

Query Tool: add EventFetcher 500K row accumulation guard, RSS projection guard
for 6 heavy endpoints, GC after large responses.

Mid-Section Defect: upgrade RQ health check (Redis ping + worker existence +
60s TTL cache), add sync-path RSS guard with 503 response, add stampede lock
for events endpoint, extend analysis lock timeout 90→180s.

Fix SQL comment bind-parameter bug in 4 material_trace SQL templates where
`:p0, :p1` in comments were parsed as bind variables by SQLAlchemy.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
egg
2026-03-05 07:34:34 +08:00
parent 981ae5614e
commit f6a54f357f
18 changed files with 978 additions and 240 deletions

View File

@@ -0,0 +1,119 @@
# -*- coding: utf-8 -*-
"""Interactive memory guard — reusable DataFrame + RSS projection guard.
Extracted from reject_dataset_cache to allow cross-tool reuse.
Prevents expensive in-memory operations from pushing worker RSS over limit.
Two-fence approach:
Fence 1: Reject if DataFrame alone exceeds max_input_mb
Fence 2: Reject if (current RSS + DataFrame * working_set_factor) exceeds max_projected_rss_mb
"""
from __future__ import annotations
import gc
import logging
import os
from typing import Optional
import pandas as pd
logger = logging.getLogger("mes_dashboard.interactive_memory_guard")
# ============================================================
# RSS / DataFrame measurement helpers
# ============================================================
def process_rss_mb() -> Optional[float]:
"""Return current process RSS in MB via psutil, or None if unavailable."""
try:
import psutil # local import: optional runtime dependency
except Exception:
return None
try:
return float(psutil.Process(os.getpid()).memory_info().rss) / (1024 * 1024)
except Exception:
return None
def df_memory_mb(df: pd.DataFrame) -> float:
"""Return deep memory usage of a DataFrame in MB."""
if df is None or df.empty:
return 0.0
try:
return float(df.memory_usage(deep=True).sum()) / (1024 * 1024)
except Exception:
return 0.0
# ============================================================
# Main guard
# ============================================================
def enforce_dataset_memory_guard(
df: pd.DataFrame,
*,
operation: str,
query_id: str = "",
max_input_mb: float = 96.0,
max_projected_rss_mb: float = 1100.0,
working_set_factor: float = 1.8,
) -> None:
"""Raise MemoryError if DataFrame or projected RSS exceeds limits.
Parameters
----------
df : pd.DataFrame
The DataFrame about to be processed.
operation : str
Human-readable label for log/error messages (e.g. "視圖查詢").
query_id : str
Optional query identifier for log correlation.
max_input_mb : float
Reject if ``df_memory_mb(df) > max_input_mb``.
max_projected_rss_mb : float
Reject if ``current_rss + df_mb * working_set_factor > max_projected_rss_mb``.
working_set_factor : float
Multiplier estimating peak memory during processing.
"""
if df is None or df.empty:
return
# Fence 1: DataFrame size
mb = df_memory_mb(df)
if mb > max_input_mb:
logger.warning(
"Reject %s due to dataset size guard (query_id=%s, df_mb=%.1f, limit_mb=%.0f)",
operation, query_id, mb, max_input_mb,
)
raise MemoryError(
f"{operation}資料量約 {mb:.1f} MB超過 {max_input_mb:.0f} MB 上限,請縮小篩選條件後重試"
)
# Fence 2: RSS projection
rss_mb = process_rss_mb()
if rss_mb is None:
return # fail-open if psutil unavailable
projected = rss_mb + (mb * working_set_factor)
if projected > max_projected_rss_mb:
logger.warning(
"Reject %s due to projected RSS guard "
"(query_id=%s, rss_mb=%.1f, df_mb=%.1f, factor=%.2f, projected_mb=%.1f, limit_mb=%.0f)",
operation, query_id, rss_mb, mb, working_set_factor, projected, max_projected_rss_mb,
)
raise MemoryError(
f"目前服務記憶體負載較高RSS {rss_mb:.1f} MB暫停{operation}計算以保護系統,"
"請稍後再試或縮小篩選條件"
)
def maybe_gc_collect(*, force: bool = True) -> None:
"""Optionally run gc.collect() after interactive computation."""
if not force:
return
try:
gc.collect()
except Exception:
pass

View File

@@ -89,11 +89,8 @@ def _resolve_limit_mb() -> int:
def _current_rss_mb() -> Optional[float]:
"""Return current process RSS in MB via psutil (not peak)."""
try:
import psutil
return psutil.Process(os.getpid()).memory_info().rss / (1024 * 1024)
except Exception:
return None
from mes_dashboard.core.interactive_memory_guard import process_rss_mb
return process_rss_mb()
# ============================================================

View File

@@ -9,6 +9,7 @@ Contains Flask Blueprint for batch tracing and equipment period query endpoints:
- CSV export functionality
"""
import gc
import hashlib
from flask import Blueprint, jsonify, request, Response, render_template, current_app
@@ -343,16 +344,26 @@ def query_lot_history():
too_large = _reject_if_batch_too_large(cids)
if too_large is not None:
return too_large
result = get_lot_history_batch(cids, workcenter_groups=workcenter_groups)
try:
result = get_lot_history_batch(cids, workcenter_groups=workcenter_groups)
except MemoryError as exc:
return jsonify({'error': str(exc)}), 503
elif container_id:
result = get_lot_history(container_id, workcenter_groups=workcenter_groups)
try:
result = get_lot_history(container_id, workcenter_groups=workcenter_groups)
except MemoryError as exc:
return jsonify({'error': str(exc)}), 503
else:
return jsonify({'error': '請指定 CONTAINERID'}), 400
if 'error' in result:
return jsonify(result), 400
return jsonify(result)
response = jsonify(result)
total = result.get('total', 0)
if total > 10000:
gc.collect()
return response
# ============================================================
@@ -425,7 +436,10 @@ def query_lot_associations():
too_large = _reject_if_batch_too_large(cids)
if too_large is not None:
return too_large
result = get_lot_associations_batch(cids, assoc_type)
try:
result = get_lot_associations_batch(cids, assoc_type)
except MemoryError as exc:
return jsonify({'error': str(exc)}), 503
else:
if not container_id:
return jsonify({'error': '請指定 CONTAINERID'}), 400
@@ -499,35 +513,42 @@ def query_equipment_period():
return jsonify({'error': f'不支援的查詢類型: {query_type}'}), 400
# Execute query based on type
if query_type == 'status_hours':
if not equipment_ids:
return jsonify({'error': '請選擇至少一台設備'}), 400
result = get_equipment_status_hours(equipment_ids, start_date, end_date)
try:
if query_type == 'status_hours':
if not equipment_ids:
return jsonify({'error': '請選擇至少一台設備'}), 400
result = get_equipment_status_hours(equipment_ids, start_date, end_date)
elif query_type == 'lots':
if not equipment_ids:
return jsonify({'error': '請選擇至少一台設備'}), 400
result = get_equipment_lots(equipment_ids, start_date, end_date)
elif query_type == 'lots':
if not equipment_ids:
return jsonify({'error': '請選擇至少一台設備'}), 400
result = get_equipment_lots(equipment_ids, start_date, end_date)
elif query_type == 'materials':
if not equipment_names:
return jsonify({'error': '請選擇至少一台設備'}), 400
result = get_equipment_materials(equipment_names, start_date, end_date)
elif query_type == 'materials':
if not equipment_names:
return jsonify({'error': '請選擇至少一台設備'}), 400
result = get_equipment_materials(equipment_names, start_date, end_date)
elif query_type == 'rejects':
if not equipment_names:
return jsonify({'error': '請選擇至少一台設備'}), 400
result = get_equipment_rejects(equipment_names, start_date, end_date)
elif query_type == 'rejects':
if not equipment_names:
return jsonify({'error': '請選擇至少一台設備'}), 400
result = get_equipment_rejects(equipment_names, start_date, end_date)
elif query_type == 'jobs':
if not equipment_ids:
return jsonify({'error': '請選擇至少一台設備'}), 400
result = get_equipment_jobs(equipment_ids, start_date, end_date)
elif query_type == 'jobs':
if not equipment_ids:
return jsonify({'error': '請選擇至少一台設備'}), 400
result = get_equipment_jobs(equipment_ids, start_date, end_date)
except MemoryError as exc:
return jsonify({'error': str(exc)}), 503
if 'error' in result:
return jsonify(result), 400
return jsonify(result)
response = jsonify(result)
total = result.get('total', 0)
if total > 10000:
gc.collect()
return response
# ============================================================

View File

@@ -21,7 +21,9 @@ from typing import Any, Dict, List, Optional
from flask import Blueprint, Response, jsonify, request
from mes_dashboard.core.cache import cache_get, cache_set
from mes_dashboard.core.interactive_memory_guard import process_rss_mb
from mes_dashboard.core.rate_limit import configured_rate_limit
from mes_dashboard.core.redis_client import try_acquire_lock, release_lock
from mes_dashboard.core.response import error_response
from mes_dashboard.services.event_fetcher import EventFetcher
from mes_dashboard.services.lineage_engine import LineageEngine
@@ -49,6 +51,14 @@ TRACE_EVENTS_MAX_WORKERS = int(os.getenv('TRACE_EVENTS_MAX_WORKERS', '2'))
TRACE_EVENTS_CID_LIMIT = int(os.getenv('TRACE_EVENTS_CID_LIMIT', '50000'))
TRACE_CACHE_TTL_SECONDS = 300
# RSS guard: reject sync event processing when RSS exceeds this MB threshold
TRACE_SYNC_RSS_REJECT_MB = float(os.getenv("TRACE_SYNC_RSS_REJECT_MB", "1100"))
# Stampede lock for events endpoint sync path
EVENTS_LOCK_TTL_SECONDS = 240
EVENTS_LOCK_WAIT_TIMEOUT_SECONDS = 180
EVENTS_LOCK_POLL_INTERVAL_SECONDS = 0.5
PROFILE_QUERY_TOOL = "query_tool"
PROFILE_QUERY_TOOL_REVERSE = "query_tool_reverse"
PROFILE_MID_SECTION_DEFECT = "mid_section_defect"
@@ -689,6 +699,20 @@ def events():
if cached is not None:
return jsonify(cached)
# --- MD2: RSS guard before sync execution ---
rss_now = process_rss_mb()
if rss_now is not None and rss_now > TRACE_SYNC_RSS_REJECT_MB:
logger.warning(
"trace events sync rejected due to RSS guard rss_mb=%.1f limit_mb=%.0f cid_count=%s",
rss_now, TRACE_SYNC_RSS_REJECT_MB, cid_count,
)
return error_response(
"SERVICE_OVERLOADED",
"伺服器記憶體負載過高,請稍後再試",
status_code=503,
headers={"Retry-After": "30"},
)
logger.info(
"trace events profile=%s domains=%s cid_count=%s correlation_cache_key=%s",
profile,
@@ -697,61 +721,106 @@ def events():
payload.get("cache_key"),
)
started = time.monotonic()
results: Dict[str, Dict[str, Any]] = {}
raw_domain_results: Dict[str, Dict[str, List[Dict[str, Any]]]] = {}
failed_domains: List[str] = []
# --- MD4: Stampede lock for sync path ---
cid_hash = hashlib.md5("|".join(sorted(container_ids)).encode("utf-8")).hexdigest()
lock_key = f"trace:events:{cid_hash}"
lock_acquired = try_acquire_lock(lock_key, ttl_seconds=EVENTS_LOCK_TTL_SECONDS)
with ThreadPoolExecutor(max_workers=min(len(domains), TRACE_EVENTS_MAX_WORKERS)) as executor:
futures = {
executor.submit(EventFetcher.fetch_events, container_ids, domain): domain
for domain in domains
if not lock_acquired:
# Another worker is processing the same CID set — poll cache for result
wait_start = time.monotonic()
while (time.monotonic() - wait_start) < EVENTS_LOCK_WAIT_TIMEOUT_SECONDS:
cached = cache_get(events_cache_key)
if cached is not None:
return jsonify(cached)
time.sleep(EVENTS_LOCK_POLL_INTERVAL_SECONDS)
logger.warning(
"trace events stampede lock wait timeout; proceeding with fail-open execution "
"cid_hash=%s cid_count=%s",
cid_hash[:12], cid_count,
)
try:
# Double-check cache after acquiring lock
if lock_acquired:
cached = cache_get(events_cache_key)
if cached is not None:
return jsonify(cached)
started = time.monotonic()
results: Dict[str, Dict[str, Any]] = {}
raw_domain_results: Dict[str, Dict[str, List[Dict[str, Any]]]] = {}
failed_domains: List[str] = []
with ThreadPoolExecutor(max_workers=min(len(domains), TRACE_EVENTS_MAX_WORKERS)) as executor:
futures = {
executor.submit(EventFetcher.fetch_events, container_ids, domain): domain
for domain in domains
}
for future in as_completed(futures):
domain = futures[future]
try:
events_by_cid = future.result()
rows = _flatten_domain_records(events_by_cid)
results[domain] = {"data": rows, "count": len(rows)}
if is_msd:
raw_domain_results[domain] = events_by_cid
# Non-MSD: events_by_cid goes out of scope here, no double retention
except Exception as exc:
logger.error("events stage domain failed domain=%s: %s", domain, exc, exc_info=True)
failed_domains.append(domain)
elapsed = time.monotonic() - started
if elapsed > TRACE_SLOW_THRESHOLD_SECONDS:
logger.warning("trace events slow elapsed=%.2fs domains=%s", elapsed, ",".join(domains))
# --- MD2: RSS guard before aggregation computation ---
aggregation = None
if profile == PROFILE_MID_SECTION_DEFECT:
rss_before_agg = process_rss_mb()
if rss_before_agg is not None and rss_before_agg > TRACE_SYNC_RSS_REJECT_MB:
del raw_domain_results
logger.warning(
"trace events aggregation rejected due to RSS guard rss_mb=%.1f limit_mb=%.0f",
rss_before_agg, TRACE_SYNC_RSS_REJECT_MB,
)
return error_response(
"SERVICE_OVERLOADED",
"伺服器記憶體負載過高,無法完成聚合計算,請稍後再試",
status_code=503,
headers={"Retry-After": "30"},
)
aggregation, agg_error = _build_msd_aggregation(payload, raw_domain_results)
del raw_domain_results
if agg_error is not None:
code, message, status = agg_error
return _error(code, message, status)
else:
del raw_domain_results
response: Dict[str, Any] = {
"stage": "events",
"results": results,
"aggregation": aggregation,
}
for future in as_completed(futures):
domain = futures[future]
try:
events_by_cid = future.result()
rows = _flatten_domain_records(events_by_cid)
results[domain] = {"data": rows, "count": len(rows)}
if is_msd:
raw_domain_results[domain] = events_by_cid
# Non-MSD: events_by_cid goes out of scope here, no double retention
except Exception as exc:
logger.error("events stage domain failed domain=%s: %s", domain, exc, exc_info=True)
failed_domains.append(domain)
elapsed = time.monotonic() - started
if elapsed > TRACE_SLOW_THRESHOLD_SECONDS:
logger.warning("trace events slow elapsed=%.2fs domains=%s", elapsed, ",".join(domains))
if failed_domains:
response["error"] = "one or more domains failed"
response["code"] = "EVENTS_PARTIAL_FAILURE"
response["failed_domains"] = sorted(failed_domains)
aggregation = None
if profile == PROFILE_MID_SECTION_DEFECT:
aggregation, agg_error = _build_msd_aggregation(payload, raw_domain_results)
del raw_domain_results
if agg_error is not None:
code, message, status = agg_error
return _error(code, message, status)
else:
del raw_domain_results
if not is_msd and len(container_ids) <= 10000:
cache_set(events_cache_key, response, ttl=TRACE_CACHE_TTL_SECONDS)
response: Dict[str, Any] = {
"stage": "events",
"results": results,
"aggregation": aggregation,
}
if len(container_ids) > 10000:
gc.collect()
if failed_domains:
response["error"] = "one or more domains failed"
response["code"] = "EVENTS_PARTIAL_FAILURE"
response["failed_domains"] = sorted(failed_domains)
if not is_msd and len(container_ids) <= 10000:
cache_set(events_cache_key, response, ttl=TRACE_CACHE_TTL_SECONDS)
if len(container_ids) > 10000:
gc.collect()
return jsonify(response)
return jsonify(response)
finally:
if lock_acquired:
release_lock(lock_key)
@trace_bp.route("/job/<job_id>", methods=["GET"])

View File

@@ -21,6 +21,7 @@ logger = logging.getLogger("mes_dashboard.event_fetcher")
ORACLE_IN_BATCH_SIZE = 1000
EVENT_FETCHER_MAX_WORKERS = int(os.getenv('EVENT_FETCHER_MAX_WORKERS', '2'))
CACHE_SKIP_CID_THRESHOLD = int(os.getenv('EVENT_FETCHER_CACHE_SKIP_CID_THRESHOLD', '10000'))
EVENT_FETCHER_MAX_TOTAL_ROWS = int(os.getenv('EVENT_FETCHER_MAX_TOTAL_ROWS', '500000'))
EVENT_FETCHER_ALLOW_PARTIAL_RESULTS = (
os.getenv('EVENT_FETCHER_ALLOW_PARTIAL_RESULTS', 'false').strip().lower()
in {'1', 'true', 'yes', 'on'}
@@ -238,6 +239,9 @@ class EventFetcher:
return cached
grouped: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
total_row_count = [0] # mutable counter shared across batches
truncated = [False]
max_total_rows = EVENT_FETCHER_MAX_TOTAL_ROWS
spec = _DOMAIN_SPECS[domain]
filter_column = spec["filter_column"]
match_mode = spec.get("match_mode", "in")
@@ -250,6 +254,8 @@ class EventFetcher:
def _fetch_and_group_batch(batch_ids):
"""Fetch a batch using fetchmany iterator and group into ``grouped``."""
if truncated[0]:
return
builder = QueryBuilder()
if match_mode == "contains":
builder.add_or_like_conditions(filter_column, batch_ids, position="both")
@@ -258,6 +264,8 @@ class EventFetcher:
sql = EventFetcher._build_domain_sql(domain, builder.get_conditions_sql())
for columns, rows in read_sql_df_slow_iter(sql, builder.params, timeout_seconds=60):
if truncated[0]:
break
for row in rows:
record = {k: _sanitize_value(v) for k, v in zip(columns, row)}
if domain == "jobs":
@@ -269,11 +277,27 @@ class EventFetcher:
enriched = dict(record)
enriched["CONTAINERID"] = cid
grouped[cid].append(enriched)
total_row_count[0] += 1
if total_row_count[0] >= max_total_rows:
logger.warning(
"EventFetcher total-row guard triggered domain=%s rows=%s limit=%s — truncating",
domain, total_row_count[0], max_total_rows,
)
truncated[0] = True
break
continue
cid = record.get("CONTAINERID")
if not isinstance(cid, str) or not cid:
continue
grouped[cid].append(record)
total_row_count[0] += 1
if total_row_count[0] >= max_total_rows:
logger.warning(
"EventFetcher total-row guard triggered domain=%s rows=%s limit=%s — truncating",
domain, total_row_count[0], max_total_rows,
)
truncated[0] = True
break
batches = [
normalized_ids[i:i + ORACLE_IN_BATCH_SIZE]
@@ -305,6 +329,15 @@ class EventFetcher:
result = dict(grouped)
del grouped
# Attach truncation metadata (backwards-compatible: callers that don't
# know about __meta__ simply iterate CID keys and skip non-list values).
if truncated[0]:
result["__meta__"] = {
"truncated": True,
"total_rows_fetched": total_row_count[0],
"max_total_rows": max_total_rows,
}
if len(normalized_ids) <= CACHE_SKIP_CID_THRESHOLD:
cache_set(cache_key, result, ttl=_DOMAIN_SPECS[domain]["cache_ttl"])
else:
@@ -313,9 +346,10 @@ class EventFetcher:
domain, len(normalized_ids), CACHE_SKIP_CID_THRESHOLD,
)
logger.info(
"EventFetcher fetched domain=%s queried_cids=%s hit_cids=%s",
"EventFetcher fetched domain=%s queried_cids=%s hit_cids=%s truncated=%s",
domain,
len(normalized_ids),
len(result),
truncated[0],
)
return result

View File

@@ -11,6 +11,12 @@ from typing import Any, Dict, List, Optional
import pandas as pd
from mes_dashboard.core.database import read_sql_df, read_sql_df_slow
from mes_dashboard.core.interactive_memory_guard import (
enforce_dataset_memory_guard,
maybe_gc_collect,
)
from mes_dashboard.core.redis_df_store import redis_load_df, redis_store_df
from mes_dashboard.services.batch_query_engine import compute_query_hash
from mes_dashboard.services.container_resolution_policy import (
validate_resolution_request,
)
@@ -23,6 +29,7 @@ from mes_dashboard.sql import QueryBuilder, SQLLoader
logger = logging.getLogger("mes_dashboard.material_trace")
_REVERSE_MAX_ROWS = 10_000
_FORWARD_MAX_ROWS = 50_000
_EXPORT_MAX_ROWS = 50_000
# Safeguard: max DataFrame memory (MB) before aborting — same pattern as batch_query_engine
@@ -31,6 +38,9 @@ _MAX_RESULT_MB = int(os.getenv("MATERIAL_TRACE_MAX_RESULT_MB", "256"))
# Safeguard: IN-clause batch size — Oracle has practical limits on large IN lists
_IN_BATCH_SIZE = 1000
# Redis result cache TTL (seconds)
_CACHE_TTL = 300
_CSV_COLUMNS = {
"CONTAINERNAME": "LOT ID",
"PJ_WORKORDER": "工單",
@@ -153,13 +163,20 @@ def _resolve_container_ids(
return resolved_ids, name_to_id, unresolved
def _check_memory_guard(df: pd.DataFrame) -> None:
"""Raise if DataFrame exceeds memory threshold."""
mem_mb = df.memory_usage(deep=True).sum() / (1024 * 1024)
if mem_mb > _MAX_RESULT_MB:
raise MemoryError(
f"查詢結果佔用 {mem_mb:.0f} MB超過 {_MAX_RESULT_MB} MB 上限,請縮小查詢範圍"
)
def _check_memory_guard(df: pd.DataFrame, *, query_id: str = "") -> None:
"""Raise if DataFrame exceeds memory / RSS thresholds.
Delegates to the shared interactive_memory_guard for two-fence protection.
Kept as a thin wrapper so existing call-sites remain unchanged.
"""
enforce_dataset_memory_guard(
df,
operation="物料追溯查詢",
query_id=query_id,
max_input_mb=float(_MAX_RESULT_MB),
max_projected_rss_mb=1100.0,
working_set_factor=1.8,
)
def _execute_batched_query(
@@ -238,6 +255,42 @@ def _paginate(df: pd.DataFrame, page: int, per_page: int) -> Dict[str, Any]:
}
# ============================================================
# Redis result cache helpers
# ============================================================
def _compute_cache_key(
mode: str,
values: List[str],
workcenter_groups: Optional[List[str]] = None,
) -> str:
"""Compute a deterministic Redis cache key for a query."""
cache_hash = compute_query_hash({
"mode": mode,
"values": sorted(values),
"workcenter_groups": sorted(workcenter_groups) if workcenter_groups else [],
})
return f"mt:result:{cache_hash}"
def _try_load_cached_df(cache_key: str) -> Optional[pd.DataFrame]:
"""Attempt to load a cached DataFrame from Redis."""
try:
return redis_load_df(cache_key)
except Exception as exc:
logger.debug("Redis cache load failed (%s): %s", cache_key, exc)
return None
def _try_store_cached_df(cache_key: str, df: pd.DataFrame) -> None:
"""Attempt to store a DataFrame in Redis cache."""
try:
redis_store_df(cache_key, df, ttl=_CACHE_TTL)
except Exception as exc:
logger.debug("Redis cache store failed (%s): %s", cache_key, exc)
# ============================================================
# Forward query (LOT ID / Work Order → Materials)
# ============================================================
@@ -252,6 +305,16 @@ def forward_query(
) -> Dict[str, Any]:
"""Execute forward material trace query."""
meta: Dict[str, Any] = {}
cache_key = _compute_cache_key(mode, values, workcenter_groups)
# Try Redis cache first (pagination and re-queries skip Oracle)
cached_df = _try_load_cached_df(cache_key)
if cached_df is not None:
logger.debug("Forward query cache hit: %s", cache_key)
result = _paginate(cached_df, page, per_page)
result["meta"] = meta
return result
wc_names = _resolve_workcenter_names(workcenter_groups)
if mode == "lot":
@@ -266,10 +329,22 @@ def forward_query(
else: # workorder
df = _execute_batched_query("material_trace/forward_by_workorder", "m.PJ_WORKORDER", values, wc_names)
maybe_gc_collect()
if df.empty:
return {"rows": [], "pagination": {"page": 1, "per_page": per_page, "total": 0, "total_pages": 0}, "meta": meta}
# Forward truncation — SQL fetches 50001 rows to detect overflow
if len(df) > _FORWARD_MAX_ROWS:
df = df.iloc[:_FORWARD_MAX_ROWS]
meta["truncated"] = True
meta["max_rows"] = _FORWARD_MAX_ROWS
df = _enrich_workcenter_group(df)
# Store enriched result in Redis for subsequent pagination / export
_try_store_cached_df(cache_key, df)
result = _paginate(df, page, per_page)
result["meta"] = meta
return result
@@ -288,10 +363,22 @@ def reverse_query(
) -> Dict[str, Any]:
"""Execute reverse material trace query."""
meta: Dict[str, Any] = {}
cache_key = _compute_cache_key("material_lot", values, workcenter_groups)
# Try Redis cache first
cached_df = _try_load_cached_df(cache_key)
if cached_df is not None:
logger.debug("Reverse query cache hit: %s", cache_key)
result = _paginate(cached_df, page, per_page)
result["meta"] = meta
return result
wc_names = _resolve_workcenter_names(workcenter_groups)
df = _execute_batched_query("material_trace/reverse_by_material_lot", "m.MATERIALLOTNAME", values, wc_names)
maybe_gc_collect()
if df.empty:
return {"rows": [], "pagination": {"page": 1, "per_page": per_page, "total": 0, "total_pages": 0}, "meta": meta}
@@ -302,6 +389,10 @@ def reverse_query(
meta["max_rows"] = _REVERSE_MAX_ROWS
df = _enrich_workcenter_group(df)
# Store enriched result in Redis for subsequent pagination / export
_try_store_cached_df(cache_key, df)
result = _paginate(df, page, per_page)
result["meta"] = meta
return result
@@ -319,22 +410,37 @@ def export_csv(
) -> tuple[bytes, Dict[str, Any]]:
"""Export query results as UTF-8 BOM CSV."""
meta: Dict[str, Any] = {}
wc_names = _resolve_workcenter_names(workcenter_groups)
if mode == "lot":
container_ids, _name_map, unresolved = _resolve_container_ids(values)
if unresolved:
meta["unresolved"] = unresolved
if not container_ids:
# Try Redis cache first — avoids re-querying Oracle for export
cache_key = _compute_cache_key(mode, values, workcenter_groups)
df = _try_load_cached_df(cache_key)
if df is None:
# Cache miss — execute query
wc_names = _resolve_workcenter_names(workcenter_groups)
if mode == "lot":
container_ids, _name_map, unresolved = _resolve_container_ids(values)
if unresolved:
meta["unresolved"] = unresolved
if not container_ids:
return _empty_csv(), meta
df = _execute_batched_query("material_trace/forward_by_lot", "m.CONTAINERID", container_ids, wc_names, allow_patterns=False)
elif mode == "workorder":
df = _execute_batched_query("material_trace/forward_by_workorder", "m.PJ_WORKORDER", values, wc_names)
else: # material_lot
df = _execute_batched_query("material_trace/reverse_by_material_lot", "m.MATERIALLOTNAME", values, wc_names)
if df.empty:
return _empty_csv(), meta
df = _execute_batched_query("material_trace/forward_by_lot", "m.CONTAINERID", container_ids, wc_names, allow_patterns=False)
df = _enrich_workcenter_group(df)
elif mode == "workorder":
df = _execute_batched_query("material_trace/forward_by_workorder", "m.PJ_WORKORDER", values, wc_names)
else: # material_lot
df = _execute_batched_query("material_trace/reverse_by_material_lot", "m.MATERIALLOTNAME", values, wc_names)
# Store in cache for potential subsequent requests
_try_store_cached_df(cache_key, df)
if df.empty:
return _empty_csv(), meta
@@ -345,8 +451,6 @@ def export_csv(
meta["truncated"] = True
meta["export_max_rows"] = _EXPORT_MAX_ROWS
df = _enrich_workcenter_group(df)
# Select and rename columns for CSV
available_cols = [c for c in _CSV_COLUMNS if c in df.columns]
export_df = df[available_cols].rename(columns=_CSV_COLUMNS)

View File

@@ -64,8 +64,8 @@ CACHE_TTL_LOSS_REASONS = 86400 # 24h for loss reason list (daily sync)
FORWARD_PIPELINE_MAX_WORKERS = int(os.getenv('FORWARD_PIPELINE_MAX_WORKERS', '2'))
# Distributed lock settings for query_analysis cold-cache path
ANALYSIS_LOCK_TTL_SECONDS = 120
ANALYSIS_LOCK_WAIT_TIMEOUT_SECONDS = 90
ANALYSIS_LOCK_TTL_SECONDS = 240
ANALYSIS_LOCK_WAIT_TIMEOUT_SECONDS = 180
ANALYSIS_LOCK_POLL_INTERVAL_SECONDS = 0.5
# Top N for chart display (rest grouped as "其他")
@@ -863,7 +863,7 @@ def _fetch_station_detection_data(
) -> Optional[pd.DataFrame]:
"""Execute station_detection.sql and return raw DataFrame.
For date ranges exceeding BATCH_QUERY_TIME_THRESHOLD_DAYS (default 10),
For date ranges exceeding BATCH_QUERY_TIME_THRESHOLD_DAYS (default 10),
the query is decomposed into monthly chunks via BatchQueryEngine to
prevent Oracle timeout on high-volume stations.
"""

View File

@@ -26,15 +26,16 @@ from typing import Any, Dict, List, Optional, Generator, Iterable, Tuple
import pandas as pd
from mes_dashboard.core.database import read_sql_df
from mes_dashboard.sql import QueryBuilder, SQLLoader
from mes_dashboard.services.container_resolution_policy import (
assess_resolution_result,
normalize_input_values,
validate_resolution_request,
validate_resolution_result,
)
from mes_dashboard.services.event_fetcher import EventFetcher
from mes_dashboard.core.database import read_sql_df
from mes_dashboard.sql import QueryBuilder, SQLLoader
from mes_dashboard.services.container_resolution_policy import (
assess_resolution_result,
normalize_input_values,
validate_resolution_request,
validate_resolution_result,
)
from mes_dashboard.core.interactive_memory_guard import process_rss_mb
from mes_dashboard.services.event_fetcher import EventFetcher
try:
from mes_dashboard.core.database import read_sql_df_slow
@@ -55,6 +56,27 @@ MAX_EQUIPMENTS = 20
MAX_DATE_RANGE_DAYS = 365
DEFAULT_TIME_WINDOW_HOURS = 168 # 1 week for better PJ_TYPE detection
ADJACENT_LOTS_COUNT = 3
QUERY_TOOL_RSS_REJECT_MB = float(os.getenv('QUERY_TOOL_RSS_REJECT_MB', '1100'))
def _check_rss_guard(operation: str) -> None:
"""Reject request if current worker RSS exceeds safety threshold.
Raises MemoryError with a user-facing 中文 message so the route layer
can convert it to a 503 / error JSON.
"""
rss = process_rss_mb()
if rss is None:
return # fail-open if psutil unavailable
if rss > QUERY_TOOL_RSS_REJECT_MB:
logger.warning(
"RSS guard rejected %s — rss_mb=%.1f limit_mb=%.0f",
operation, rss, QUERY_TOOL_RSS_REJECT_MB,
)
raise MemoryError(
f"目前服務記憶體負載較高RSS {rss:.1f} MB暫停{operation}以保護系統,"
"請稍後再試或縮小查詢範圍"
)
def _max_batch_container_ids() -> int:
@@ -95,7 +117,7 @@ def validate_date_range(start_date: str, end_date: str, max_days: int = MAX_DATE
return f'日期格式錯誤: {e}'
def validate_lot_input(input_type: str, values: List[str]) -> Optional[str]:
def validate_lot_input(input_type: str, values: List[str]) -> Optional[str]:
"""Validate LOT input based on type.
Args:
@@ -105,7 +127,7 @@ def validate_lot_input(input_type: str, values: List[str]) -> Optional[str]:
Returns:
Error message if validation fails, None if valid.
"""
return validate_resolution_request(input_type, values)
return validate_resolution_request(input_type, values)
def validate_equipment_input(equipment_ids: List[str]) -> Optional[str]:
@@ -334,50 +356,50 @@ def resolve_lots(input_type: str, values: List[str]) -> Dict[str, Any]:
return {'error': validation_error}
# Clean values
cleaned = normalize_input_values(values)
if not cleaned:
return {'error': '請輸入有效的查詢條件'}
cleaned = normalize_input_values(values)
if not cleaned:
return {'error': '請輸入有效的查詢條件'}
try:
if input_type == 'lot_id':
result = _resolve_by_lot_id(cleaned)
elif input_type == 'wafer_lot':
result = _resolve_by_wafer_lot(cleaned)
elif input_type == 'gd_lot_id':
result = _resolve_by_gd_lot_id(cleaned)
elif input_type == 'serial_number':
result = _resolve_by_serial_number(cleaned)
elif input_type == 'work_order':
result = _resolve_by_work_order(cleaned)
elif input_type == 'gd_work_order':
result = _resolve_by_gd_work_order(cleaned)
else:
return {'error': f'不支援的輸入類型: {input_type}'}
guard_assessment = assess_resolution_result(result)
overflow_tokens = guard_assessment.get("expansion_offenders") or []
overflow_total = bool(guard_assessment.get("over_container_limit"))
if overflow_tokens or overflow_total:
logger.warning(
"Resolution guardrail overflow (input_type=%s, offenders=%s, resolved=%s, max=%s); continuing with decompose path",
input_type,
len(overflow_tokens),
guard_assessment.get("resolved_container_ids"),
guard_assessment.get("max_container_ids"),
)
result["guardrail"] = {
"overflow": True,
"expansion_offenders": overflow_tokens,
"resolved_container_ids": guard_assessment.get("resolved_container_ids"),
"max_container_ids": guard_assessment.get("max_container_ids"),
}
# Keep compatibility: validation API remains available for strict call sites.
guard_error = validate_resolution_result(result, strict=False)
if guard_error:
return {'error': guard_error}
return result
except Exception as exc:
if input_type == 'lot_id':
result = _resolve_by_lot_id(cleaned)
elif input_type == 'wafer_lot':
result = _resolve_by_wafer_lot(cleaned)
elif input_type == 'gd_lot_id':
result = _resolve_by_gd_lot_id(cleaned)
elif input_type == 'serial_number':
result = _resolve_by_serial_number(cleaned)
elif input_type == 'work_order':
result = _resolve_by_work_order(cleaned)
elif input_type == 'gd_work_order':
result = _resolve_by_gd_work_order(cleaned)
else:
return {'error': f'不支援的輸入類型: {input_type}'}
guard_assessment = assess_resolution_result(result)
overflow_tokens = guard_assessment.get("expansion_offenders") or []
overflow_total = bool(guard_assessment.get("over_container_limit"))
if overflow_tokens or overflow_total:
logger.warning(
"Resolution guardrail overflow (input_type=%s, offenders=%s, resolved=%s, max=%s); continuing with decompose path",
input_type,
len(overflow_tokens),
guard_assessment.get("resolved_container_ids"),
guard_assessment.get("max_container_ids"),
)
result["guardrail"] = {
"overflow": True,
"expansion_offenders": overflow_tokens,
"resolved_container_ids": guard_assessment.get("resolved_container_ids"),
"max_container_ids": guard_assessment.get("max_container_ids"),
}
# Keep compatibility: validation API remains available for strict call sites.
guard_error = validate_resolution_result(result, strict=False)
if guard_error:
return {'error': guard_error}
return result
except Exception as exc:
logger.error(f"LOT resolution failed: {exc}")
return {'error': f'解析失敗: {str(exc)}'}
@@ -797,6 +819,7 @@ def get_lot_history(
return {'error': '請指定 CONTAINERID'}
try:
_check_rss_guard("LOT 生產履歷查詢")
events_by_cid = EventFetcher.fetch_events([container_id], "history")
rows = list(events_by_cid.get(container_id, []))
@@ -826,6 +849,8 @@ def get_lot_history(
'filtered_by_groups': workcenter_groups or [],
}
except MemoryError:
raise
except Exception as exc:
logger.error(f"LOT history query failed for {container_id}: {exc}")
return {'error': f'查詢失敗: {str(exc)}'}
@@ -908,6 +933,7 @@ def get_lot_history_batch(
return {'error': f'container_ids 數量不可超過 {max_ids}'}
try:
_check_rss_guard("LOT 批次生產履歷查詢")
events_by_cid = EventFetcher.fetch_events(container_ids, "history")
rows = []
@@ -938,6 +964,8 @@ def get_lot_history_batch(
'filtered_by_groups': workcenter_groups or [],
}
except MemoryError:
raise
except Exception as exc:
logger.error(f"LOT history batch query failed: {exc}")
return {'error': f'查詢失敗: {str(exc)}'}
@@ -967,6 +995,7 @@ def get_lot_associations_batch(
return {'error': f'批次查詢不支援類型: {assoc_type}'}
try:
_check_rss_guard(f"LOT 批次{assoc_type}查詢")
events_by_cid = EventFetcher.fetch_events(container_ids, assoc_type)
rows = []
@@ -991,6 +1020,8 @@ def get_lot_associations_batch(
'container_ids': container_ids,
}
except MemoryError:
raise
except Exception as exc:
logger.error(f"LOT {assoc_type} batch query failed: {exc}")
return {'error': f'查詢失敗: {str(exc)}'}
@@ -1602,6 +1633,7 @@ def get_equipment_lots(
return {'error': validation_error}
try:
_check_rss_guard("設備批次 LOT 查詢")
builder = QueryBuilder()
builder.add_in_condition("h.EQUIPMENTID", equipment_ids)
sql = SQLLoader.load_with_params(
@@ -1622,6 +1654,8 @@ def get_equipment_lots(
'date_range': {'start': start_date, 'end': end_date},
}
except MemoryError:
raise
except Exception as exc:
logger.error(f"Equipment lots query failed: {exc}")
return {'error': f'查詢失敗: {str(exc)}'}
@@ -1652,6 +1686,7 @@ def get_equipment_materials(
return {'error': validation_error}
try:
_check_rss_guard("設備原料消耗查詢")
builder = QueryBuilder()
builder.add_in_condition("EQUIPMENTNAME", equipment_names)
sql = SQLLoader.load_with_params(
@@ -1672,6 +1707,8 @@ def get_equipment_materials(
'date_range': {'start': start_date, 'end': end_date},
}
except MemoryError:
raise
except Exception as exc:
logger.error(f"Equipment materials query failed: {exc}")
return {'error': f'查詢失敗: {str(exc)}'}
@@ -1702,6 +1739,7 @@ def get_equipment_rejects(
return {'error': validation_error}
try:
_check_rss_guard("設備不良品查詢")
builder = QueryBuilder()
builder.add_in_condition("EQUIPMENTNAME", equipment_names)
sql = SQLLoader.load_with_params(
@@ -1722,6 +1760,8 @@ def get_equipment_rejects(
'date_range': {'start': start_date, 'end': end_date},
}
except MemoryError:
raise
except Exception as exc:
logger.error(f"Equipment rejects query failed: {exc}")
return {'error': f'查詢失敗: {str(exc)}'}

View File

@@ -323,75 +323,31 @@ def _store_query_result(query_id: str, df: pd.DataFrame) -> bool:
def _df_memory_mb(df: pd.DataFrame) -> float:
if df is None or df.empty:
return 0.0
try:
return float(df.memory_usage(deep=True).sum()) / (1024 * 1024)
except Exception:
return 0.0
from mes_dashboard.core.interactive_memory_guard import df_memory_mb
return df_memory_mb(df)
def _process_rss_mb() -> Optional[float]:
try:
import psutil # local import: optional runtime dependency
except Exception:
return None
try:
process = psutil.Process(os.getpid())
return float(process.memory_info().rss) / (1024 * 1024)
except Exception:
return None
from mes_dashboard.core.interactive_memory_guard import process_rss_mb
return process_rss_mb()
def _enforce_interactive_memory_guard(df: pd.DataFrame, *, operation: str, query_id: str) -> None:
"""Prevent expensive cache-based recomputation from pushing worker memory over limit."""
if df is None or df.empty:
return
df_mb = _df_memory_mb(df)
if df_mb > float(_REJECT_DERIVE_MAX_INPUT_MB):
logger.warning(
"Reject %s due to dataset size guard (query_id=%s, df_mb=%.1f, limit_mb=%d)",
operation,
query_id,
df_mb,
_REJECT_DERIVE_MAX_INPUT_MB,
)
raise MemoryError(
f"{operation}資料量約 {df_mb:.1f} MB超過 {_REJECT_DERIVE_MAX_INPUT_MB} MB 上限,請縮小篩選條件後重試"
)
rss_mb = _process_rss_mb()
if rss_mb is None:
return
projected_rss_mb = rss_mb + (df_mb * float(_REJECT_DERIVE_WORKING_SET_FACTOR))
if projected_rss_mb > float(_REJECT_DERIVE_MAX_PROJECTED_RSS_MB):
logger.warning(
"Reject %s due to projected RSS guard (query_id=%s, rss_mb=%.1f, df_mb=%.1f, factor=%.2f, projected_mb=%.1f, limit_mb=%d)",
operation,
query_id,
rss_mb,
df_mb,
_REJECT_DERIVE_WORKING_SET_FACTOR,
projected_rss_mb,
_REJECT_DERIVE_MAX_PROJECTED_RSS_MB,
)
raise MemoryError(
(
f"目前服務記憶體負載較高RSS {rss_mb:.1f} MB暫停{operation}計算以保護系統,"
"請稍後再試或縮小篩選條件"
)
)
from mes_dashboard.core.interactive_memory_guard import enforce_dataset_memory_guard
enforce_dataset_memory_guard(
df,
operation=operation,
query_id=query_id,
max_input_mb=float(_REJECT_DERIVE_MAX_INPUT_MB),
max_projected_rss_mb=float(_REJECT_DERIVE_MAX_PROJECTED_RSS_MB),
working_set_factor=float(_REJECT_DERIVE_WORKING_SET_FACTOR),
)
def _maybe_collect_after_interactive_compute() -> None:
if not _REJECT_DERIVE_FORCE_GC:
return
try:
gc.collect()
except Exception:
return
from mes_dashboard.core.interactive_memory_guard import maybe_gc_collect
maybe_gc_collect(force=_REJECT_DERIVE_FORCE_GC)
# ============================================================

View File

@@ -36,6 +36,10 @@ TRACE_STREAM_BATCH_SIZE = int(os.getenv("TRACE_STREAM_BATCH_SIZE", "5000"))
# ---------------------------------------------------------------------------
_RQ_AVAILABLE: Optional[bool] = None
# TTL-based cache for RQ health checks to avoid checking every request
_RQ_HEALTH_TTL_SECONDS = 60
_rq_health_cache: Dict[str, Any] = {"available": None, "checked_at": 0.0}
def _check_rq_available() -> bool:
global _RQ_AVAILABLE
@@ -49,11 +53,55 @@ def _check_rq_available() -> bool:
def is_async_available() -> bool:
"""Return True if RQ is installed and Redis is reachable."""
"""Return True if RQ is installed, Redis is reachable, and workers exist.
Results are cached for 60 seconds to avoid checking every request.
Falls back gracefully: if any check fails, returns False.
"""
if not _check_rq_available():
return False
# Return cached result if within TTL
now = time.monotonic()
if (
_rq_health_cache["available"] is not None
and (now - _rq_health_cache["checked_at"]) < _RQ_HEALTH_TTL_SECONDS
):
return _rq_health_cache["available"]
conn = get_redis_client()
return conn is not None
if conn is None:
_rq_health_cache["available"] = False
_rq_health_cache["checked_at"] = now
return False
# Actual Redis ping check
try:
conn.ping()
except Exception:
logger.warning("RQ health check: Redis ping failed — marking async unavailable")
_rq_health_cache["available"] = False
_rq_health_cache["checked_at"] = now
return False
# RQ worker existence check
try:
import rq
workers = rq.Worker.all(connection=conn)
if not workers:
logger.warning("RQ health check: no workers found — marking async unavailable")
_rq_health_cache["available"] = False
_rq_health_cache["checked_at"] = now
return False
except Exception:
logger.warning("RQ health check: worker query failed — marking async unavailable")
_rq_health_cache["available"] = False
_rq_health_cache["checked_at"] = now
return False
_rq_health_cache["available"] = True
_rq_health_cache["checked_at"] = now
return True
def _get_rq_queue():

View File

@@ -3,7 +3,7 @@
--
-- Parameters:
-- Bind variables generated by QueryBuilder.add_in_condition()
-- for CONTAINERID IN (:p0, :p1, ...)
-- for CONTAINERID IN (p0, p1, ...)
--
-- Template slots:
-- {{ WHERE_CLAUSE }} - Dynamic WHERE with CONTAINERID IN + optional WORKCENTERNAME IN
@@ -27,3 +27,4 @@ LEFT JOIN DWH.DW_MES_CONTAINER c
ON c.CONTAINERID = m.CONTAINERID
{{ WHERE_CLAUSE }}
ORDER BY m.TXNDATE DESC
FETCH FIRST 50001 ROWS ONLY

View File

@@ -3,7 +3,7 @@
--
-- Parameters:
-- Bind variables generated by QueryBuilder.add_in_condition()
-- for PJ_WORKORDER IN (:p0, :p1, ...)
-- for PJ_WORKORDER IN (p0, p1, ...)
--
-- Template slots:
-- {{ WHERE_CLAUSE }} - Dynamic WHERE with PJ_WORKORDER IN + optional WORKCENTERNAME IN
@@ -27,3 +27,4 @@ LEFT JOIN DWH.DW_MES_CONTAINER c
ON c.CONTAINERID = m.CONTAINERID
{{ WHERE_CLAUSE }}
ORDER BY m.TXNDATE DESC
FETCH FIRST 50001 ROWS ONLY

View File

@@ -3,7 +3,7 @@
--
-- Parameters:
-- Bind variables generated by QueryBuilder.add_in_condition()
-- for CONTAINERNAME IN (:p0, :p1, ...)
-- for CONTAINERNAME IN (p0, p1, ...)
SELECT
c.CONTAINERID,

View File

@@ -3,7 +3,7 @@
--
-- Parameters:
-- Bind variables generated by QueryBuilder.add_in_condition()
-- for MATERIALLOTNAME IN (:p0, :p1, ...)
-- for MATERIALLOTNAME IN (p0, p1, ...)
--
-- Template slots:
-- {{ WHERE_CLAUSE }} - Dynamic WHERE with MATERIALLOTNAME IN + optional WORKCENTERNAME IN

View File

@@ -0,0 +1,134 @@
# -*- coding: utf-8 -*-
"""Tests for core/interactive_memory_guard module."""
from __future__ import annotations
from unittest.mock import patch
import pandas as pd
import pytest
from mes_dashboard.core.interactive_memory_guard import (
df_memory_mb,
enforce_dataset_memory_guard,
maybe_gc_collect,
process_rss_mb,
)
# ============================================================
# df_memory_mb
# ============================================================
class TestDfMemoryMb:
def test_empty_df_returns_zero(self):
assert df_memory_mb(pd.DataFrame()) == 0.0
def test_none_returns_zero(self):
assert df_memory_mb(None) == 0.0
def test_small_df_returns_positive(self):
df = pd.DataFrame({"a": range(100), "b": ["x"] * 100})
assert df_memory_mb(df) > 0.0
# ============================================================
# process_rss_mb
# ============================================================
class TestProcessRssMb:
def test_returns_float(self):
rss = process_rss_mb()
assert rss is None or isinstance(rss, float)
def test_returns_positive_when_psutil_available(self):
rss = process_rss_mb()
if rss is not None:
assert rss > 0
@patch("mes_dashboard.core.interactive_memory_guard.os.getpid", side_effect=Exception("fail"))
def test_returns_none_on_exception(self, _mock):
# psutil.Process(bad_pid) should raise; guard returns None
result = process_rss_mb()
# Either None (psutil catches) or float is acceptable
assert result is None or isinstance(result, float)
# ============================================================
# enforce_dataset_memory_guard
# ============================================================
class TestEnforceDatasetMemoryGuard:
def _make_df(self, n_rows: int = 100) -> pd.DataFrame:
return pd.DataFrame({"a": range(n_rows), "b": ["test"] * n_rows})
def test_empty_df_passes(self):
enforce_dataset_memory_guard(pd.DataFrame(), operation="test", query_id="q1")
def test_none_df_passes(self):
enforce_dataset_memory_guard(None, operation="test", query_id="q1")
def test_small_df_passes(self):
df = self._make_df(10)
enforce_dataset_memory_guard(df, operation="test", query_id="q1", max_input_mb=100)
def test_fence1_rejects_large_df(self):
df = self._make_df(10)
with pytest.raises(MemoryError, match="超過"):
enforce_dataset_memory_guard(
df, operation="test_op", query_id="q1", max_input_mb=0.0001
)
def test_fence2_rejects_high_rss_projection(self):
df = self._make_df(100)
with patch(
"mes_dashboard.core.interactive_memory_guard.process_rss_mb",
return_value=1000.0,
):
with pytest.raises(MemoryError, match="記憶體負載較高"):
enforce_dataset_memory_guard(
df,
operation="test_op",
query_id="q1",
max_input_mb=9999,
max_projected_rss_mb=500,
working_set_factor=1.0,
)
def test_fence2_passes_when_rss_unavailable(self):
df = self._make_df(100)
with patch(
"mes_dashboard.core.interactive_memory_guard.process_rss_mb",
return_value=None,
):
# Should not raise — fail-open when psutil unavailable
enforce_dataset_memory_guard(
df,
operation="test_op",
query_id="q1",
max_input_mb=9999,
max_projected_rss_mb=1,
)
def test_operation_name_in_error_message(self):
df = self._make_df(10)
with pytest.raises(MemoryError, match="匯出"):
enforce_dataset_memory_guard(
df, operation="匯出", query_id="q1", max_input_mb=0.0001
)
# ============================================================
# maybe_gc_collect
# ============================================================
class TestMaybeGcCollect:
def test_force_true_runs_gc(self):
with patch("mes_dashboard.core.interactive_memory_guard.gc.collect") as mock_gc:
maybe_gc_collect(force=True)
mock_gc.assert_called_once()
def test_force_false_skips_gc(self):
with patch("mes_dashboard.core.interactive_memory_guard.gc.collect") as mock_gc:
maybe_gc_collect(force=False)
mock_gc.assert_not_called()

View File

@@ -13,18 +13,25 @@ import pandas as pd
from mes_dashboard.services.material_trace_service import (
_add_exact_or_pattern_condition,
_check_memory_guard,
_compute_cache_key,
_enrich_workcenter_group,
_FORWARD_MAX_ROWS,
_IN_BATCH_SIZE,
_is_pattern_token,
_resolve_container_ids,
_resolve_workcenter_names,
_check_memory_guard,
_IN_BATCH_SIZE,
export_csv,
forward_query,
reverse_query,
)
from mes_dashboard.sql import QueryBuilder
# Common patch targets for Redis / cache / GC
_PATCH_REDIS_LOAD = "mes_dashboard.services.material_trace_service.redis_load_df"
_PATCH_REDIS_STORE = "mes_dashboard.services.material_trace_service.redis_store_df"
_PATCH_GC = "mes_dashboard.services.material_trace_service.maybe_gc_collect"
# ============================================================
# Fixtures
@@ -73,10 +80,13 @@ def _make_resolve_df(lot_names):
class TestForwardLotQuery:
@patch(_PATCH_GC)
@patch(_PATCH_REDIS_STORE)
@patch(_PATCH_REDIS_LOAD, return_value=None)
@patch("mes_dashboard.services.material_trace_service.read_sql_df_slow")
@patch("mes_dashboard.services.material_trace_service.read_sql_df")
@patch("mes_dashboard.services.material_trace_service.get_workcenter_mapping")
def test_forward_lot_resolves_and_enriches(self, mock_mapping, mock_sql, mock_sql_slow):
def test_forward_lot_resolves_and_enriches(self, mock_mapping, mock_sql, mock_sql_slow, _rl, _rs, _gc):
mock_mapping.return_value = MOCK_WORKCENTER_MAPPING
mock_sql.return_value = _make_resolve_df(["LOT-A", "LOT-B"])
mock_sql_slow.return_value = _make_material_df(5)
@@ -90,10 +100,13 @@ class TestForwardLotQuery:
assert mock_sql.call_count == 1
assert mock_sql_slow.call_count == 1
@patch(_PATCH_GC)
@patch(_PATCH_REDIS_STORE)
@patch(_PATCH_REDIS_LOAD, return_value=None)
@patch("mes_dashboard.services.material_trace_service.read_sql_df_slow")
@patch("mes_dashboard.services.material_trace_service.read_sql_df")
@patch("mes_dashboard.services.material_trace_service.get_workcenter_mapping")
def test_forward_lot_all_unresolved_returns_empty(self, mock_mapping, mock_sql, mock_sql_slow):
def test_forward_lot_all_unresolved_returns_empty(self, mock_mapping, mock_sql, mock_sql_slow, _rl, _rs, _gc):
mock_mapping.return_value = MOCK_WORKCENTER_MAPPING
mock_sql.return_value = pd.DataFrame()
@@ -111,9 +124,12 @@ class TestForwardLotQuery:
class TestForwardWorkorderQuery:
@patch(_PATCH_GC)
@patch(_PATCH_REDIS_STORE)
@patch(_PATCH_REDIS_LOAD, return_value=None)
@patch("mes_dashboard.services.material_trace_service.read_sql_df_slow")
@patch("mes_dashboard.services.material_trace_service.get_workcenter_mapping")
def test_forward_workorder_queries_directly(self, mock_mapping, mock_sql_slow):
def test_forward_workorder_queries_directly(self, mock_mapping, mock_sql_slow, _rl, _rs, _gc):
mock_mapping.return_value = MOCK_WORKCENTER_MAPPING
mock_sql_slow.return_value = _make_material_df(3)
@@ -130,9 +146,12 @@ class TestForwardWorkorderQuery:
class TestReverseQuery:
@patch(_PATCH_GC)
@patch(_PATCH_REDIS_STORE)
@patch(_PATCH_REDIS_LOAD, return_value=None)
@patch("mes_dashboard.services.material_trace_service.read_sql_df_slow")
@patch("mes_dashboard.services.material_trace_service.get_workcenter_mapping")
def test_reverse_truncation_at_10000(self, mock_mapping, mock_sql_slow):
def test_reverse_truncation_at_10000(self, mock_mapping, mock_sql_slow, _rl, _rs, _gc):
mock_mapping.return_value = MOCK_WORKCENTER_MAPPING
mock_sql_slow.return_value = _make_material_df(10001)
@@ -142,9 +161,12 @@ class TestReverseQuery:
assert result["meta"]["max_rows"] == 10000
assert result["pagination"]["total"] == 10000
@patch(_PATCH_GC)
@patch(_PATCH_REDIS_STORE)
@patch(_PATCH_REDIS_LOAD, return_value=None)
@patch("mes_dashboard.services.material_trace_service.read_sql_df_slow")
@patch("mes_dashboard.services.material_trace_service.get_workcenter_mapping")
def test_reverse_no_truncation_under_limit(self, mock_mapping, mock_sql_slow):
def test_reverse_no_truncation_under_limit(self, mock_mapping, mock_sql_slow, _rl, _rs, _gc):
mock_mapping.return_value = MOCK_WORKCENTER_MAPPING
mock_sql_slow.return_value = _make_material_df(500)
@@ -160,10 +182,13 @@ class TestReverseQuery:
class TestWorkcenterGroupFilter:
@patch(_PATCH_GC)
@patch(_PATCH_REDIS_STORE)
@patch(_PATCH_REDIS_LOAD, return_value=None)
@patch("mes_dashboard.services.material_trace_service.read_sql_df_slow")
@patch("mes_dashboard.services.material_trace_service.get_workcenters_for_groups")
@patch("mes_dashboard.services.material_trace_service.get_workcenter_mapping")
def test_workcenter_group_resolves_to_names(self, mock_mapping, mock_for_groups, mock_sql_slow):
def test_workcenter_group_resolves_to_names(self, mock_mapping, mock_for_groups, mock_sql_slow, _rl, _rs, _gc):
mock_mapping.return_value = MOCK_WORKCENTER_MAPPING
mock_for_groups.return_value = ["WC_DB_1", "WC_DB_2"]
mock_sql_slow.return_value = _make_material_df(3)
@@ -185,10 +210,13 @@ class TestWorkcenterGroupFilter:
class TestUnresolvedLots:
@patch(_PATCH_GC)
@patch(_PATCH_REDIS_STORE)
@patch(_PATCH_REDIS_LOAD, return_value=None)
@patch("mes_dashboard.services.material_trace_service.read_sql_df_slow")
@patch("mes_dashboard.services.material_trace_service.read_sql_df")
@patch("mes_dashboard.services.material_trace_service.get_workcenter_mapping")
def test_partial_resolve_reports_unresolved(self, mock_mapping, mock_sql, mock_sql_slow):
def test_partial_resolve_reports_unresolved(self, mock_mapping, mock_sql, mock_sql_slow, _rl, _rs, _gc):
mock_mapping.return_value = MOCK_WORKCENTER_MAPPING
resolve_df = pd.DataFrame(
[{"CONTAINERID": "CID_LOT_A", "CONTAINERNAME": "LOT-A"}]
@@ -225,9 +253,11 @@ class TestEnrichWorkcenterGroup:
class TestExportCsv:
@patch(_PATCH_REDIS_STORE)
@patch(_PATCH_REDIS_LOAD, return_value=None)
@patch("mes_dashboard.services.material_trace_service.read_sql_df_slow")
@patch("mes_dashboard.services.material_trace_service.get_workcenter_mapping")
def test_export_returns_utf8_bom_csv(self, mock_mapping, mock_sql_slow):
def test_export_returns_utf8_bom_csv(self, mock_mapping, mock_sql_slow, _rl, _rs):
mock_mapping.return_value = MOCK_WORKCENTER_MAPPING
mock_sql_slow.return_value = _make_material_df(3)
@@ -248,6 +278,7 @@ class TestExportCsv:
class TestMemoryGuard:
def test_memory_guard_raises_on_large_df(self):
"""_check_memory_guard delegates to enforce_dataset_memory_guard; test via low limit."""
with patch("mes_dashboard.services.material_trace_service._MAX_RESULT_MB", 0):
df = _make_material_df(10)
with pytest.raises(MemoryError, match="超過.*上限"):
@@ -257,6 +288,17 @@ class TestMemoryGuard:
df = _make_material_df(5)
_check_memory_guard(df)
def test_memory_guard_rss_projection(self):
"""Fence 2: reject when projected RSS exceeds limit (1100 MB).
With a 5-row DF (~0.003 MB), projected = RSS + 0.003*1.8 ≈ RSS+0.006.
Set RSS high enough that projected > 1100 MB.
"""
df = _make_material_df(5)
with patch("mes_dashboard.core.interactive_memory_guard.process_rss_mb", return_value=1100.0):
with pytest.raises(MemoryError, match="記憶體負載"):
_check_memory_guard(df)
# ============================================================
# Safeguards: IN-clause batching
@@ -264,9 +306,12 @@ class TestMemoryGuard:
class TestInClauseBatching:
@patch(_PATCH_GC)
@patch(_PATCH_REDIS_STORE)
@patch(_PATCH_REDIS_LOAD, return_value=None)
@patch("mes_dashboard.services.material_trace_service.read_sql_df_slow")
@patch("mes_dashboard.services.material_trace_service.get_workcenter_mapping")
def test_large_input_is_batched(self, mock_mapping, mock_sql_slow):
def test_large_input_is_batched(self, mock_mapping, mock_sql_slow, _rl, _rs, _gc):
mock_mapping.return_value = MOCK_WORKCENTER_MAPPING
mock_sql_slow.return_value = _make_material_df(3)
@@ -337,9 +382,12 @@ class TestWildcardResolve:
class TestWildcardWorkorder:
@patch(_PATCH_GC)
@patch(_PATCH_REDIS_STORE)
@patch(_PATCH_REDIS_LOAD, return_value=None)
@patch("mes_dashboard.services.material_trace_service.read_sql_df_slow")
@patch("mes_dashboard.services.material_trace_service.get_workcenter_mapping")
def test_workorder_wildcard_generates_like(self, mock_mapping, mock_sql_slow):
def test_workorder_wildcard_generates_like(self, mock_mapping, mock_sql_slow, _rl, _rs, _gc):
"""Wildcard work orders produce LIKE clause in query SQL."""
mock_mapping.return_value = MOCK_WORKCENTER_MAPPING
mock_sql_slow.return_value = _make_material_df(3)
@@ -348,3 +396,163 @@ class TestWildcardWorkorder:
sql_text = mock_sql_slow.call_args[0][0]
assert "LIKE" in sql_text
# ============================================================
# Redis cache tests
# ============================================================
class TestRedisCache:
def test_compute_cache_key_deterministic(self):
"""Same params produce the same cache key."""
k1 = _compute_cache_key("workorder", ["WO-B", "WO-A"], ["G2", "G1"])
k2 = _compute_cache_key("workorder", ["WO-A", "WO-B"], ["G1", "G2"])
assert k1 == k2
assert k1.startswith("mt:result:")
def test_compute_cache_key_differs_by_mode(self):
k1 = _compute_cache_key("lot", ["V1"])
k2 = _compute_cache_key("workorder", ["V1"])
assert k1 != k2
@patch(_PATCH_GC)
@patch(_PATCH_REDIS_STORE)
@patch(_PATCH_REDIS_LOAD)
@patch("mes_dashboard.services.material_trace_service.get_workcenter_mapping")
def test_forward_cache_hit_skips_oracle(self, mock_mapping, mock_redis_load, mock_redis_store, _gc):
"""When Redis has cached data, Oracle is never queried."""
mock_mapping.return_value = MOCK_WORKCENTER_MAPPING
cached = _make_material_df(5)
cached = cached.copy()
cached["WORKCENTER_GROUP"] = "焊接_DB"
mock_redis_load.return_value = cached
result = forward_query("workorder", ["WO-001"], page=1, per_page=50)
assert result["pagination"]["total"] == 5
assert len(result["rows"]) == 5
# redis_store should NOT be called on cache hit
mock_redis_store.assert_not_called()
@patch(_PATCH_GC)
@patch(_PATCH_REDIS_STORE)
@patch(_PATCH_REDIS_LOAD)
@patch("mes_dashboard.services.material_trace_service.get_workcenter_mapping")
def test_reverse_cache_hit_skips_oracle(self, mock_mapping, mock_redis_load, mock_redis_store, _gc):
"""Reverse query also uses Redis cache."""
mock_mapping.return_value = MOCK_WORKCENTER_MAPPING
cached = _make_material_df(3)
cached = cached.copy()
cached["WORKCENTER_GROUP"] = "焊接_DB"
mock_redis_load.return_value = cached
result = reverse_query(["MLOT-A"], page=1, per_page=50)
assert result["pagination"]["total"] == 3
mock_redis_store.assert_not_called()
@patch(_PATCH_REDIS_STORE)
@patch(_PATCH_REDIS_LOAD)
@patch("mes_dashboard.services.material_trace_service.get_workcenter_mapping")
def test_export_cache_hit_skips_oracle(self, mock_mapping, mock_redis_load, mock_redis_store):
"""Export uses cached data when available."""
mock_mapping.return_value = MOCK_WORKCENTER_MAPPING
cached = _make_material_df(3)
cached = cached.copy()
cached["WORKCENTER_GROUP"] = "焊接_DB"
mock_redis_load.return_value = cached
csv_bytes, meta = export_csv("workorder", ["WO-001"])
assert csv_bytes[:3] == b"\xef\xbb\xbf"
csv_text = csv_bytes.decode("utf-8-sig")
assert "LOT ID" in csv_text
# Should NOT re-store already cached data
mock_redis_store.assert_not_called()
@patch(_PATCH_GC)
@patch(_PATCH_REDIS_STORE)
@patch(_PATCH_REDIS_LOAD, return_value=None)
@patch("mes_dashboard.services.material_trace_service.read_sql_df_slow")
@patch("mes_dashboard.services.material_trace_service.get_workcenter_mapping")
def test_forward_cache_miss_stores_result(self, mock_mapping, mock_sql_slow, _rl, mock_redis_store, _gc):
"""On cache miss, result is stored in Redis after query."""
mock_mapping.return_value = MOCK_WORKCENTER_MAPPING
mock_sql_slow.return_value = _make_material_df(3)
forward_query("workorder", ["WO-001"], page=1, per_page=50)
mock_redis_store.assert_called_once()
call_args = mock_redis_store.call_args
assert call_args[0][0].startswith("mt:result:")
assert call_args[1]["ttl"] == 300
# ============================================================
# Forward truncation tests (MT4)
# ============================================================
class TestForwardTruncation:
@patch(_PATCH_GC)
@patch(_PATCH_REDIS_STORE)
@patch(_PATCH_REDIS_LOAD, return_value=None)
@patch("mes_dashboard.services.material_trace_service.read_sql_df_slow")
@patch("mes_dashboard.services.material_trace_service.get_workcenter_mapping")
def test_forward_truncation_at_50000(self, mock_mapping, mock_sql_slow, _rl, _rs, _gc):
mock_mapping.return_value = MOCK_WORKCENTER_MAPPING
mock_sql_slow.return_value = _make_material_df(50001)
result = forward_query("workorder", ["WO-001"], page=1, per_page=50)
assert result["meta"]["truncated"] is True
assert result["meta"]["max_rows"] == _FORWARD_MAX_ROWS
assert result["pagination"]["total"] == _FORWARD_MAX_ROWS
@patch(_PATCH_GC)
@patch(_PATCH_REDIS_STORE)
@patch(_PATCH_REDIS_LOAD, return_value=None)
@patch("mes_dashboard.services.material_trace_service.read_sql_df_slow")
@patch("mes_dashboard.services.material_trace_service.get_workcenter_mapping")
def test_forward_no_truncation_under_limit(self, mock_mapping, mock_sql_slow, _rl, _rs, _gc):
mock_mapping.return_value = MOCK_WORKCENTER_MAPPING
mock_sql_slow.return_value = _make_material_df(500)
result = forward_query("workorder", ["WO-001"], page=1, per_page=50)
assert "truncated" not in result["meta"]
assert result["pagination"]["total"] == 500
# ============================================================
# GC collect tests (MT5)
# ============================================================
class TestGcCollect:
@patch(_PATCH_GC)
@patch(_PATCH_REDIS_STORE)
@patch(_PATCH_REDIS_LOAD, return_value=None)
@patch("mes_dashboard.services.material_trace_service.read_sql_df_slow")
@patch("mes_dashboard.services.material_trace_service.get_workcenter_mapping")
def test_forward_query_calls_gc(self, mock_mapping, mock_sql_slow, _rl, _rs, mock_gc):
mock_mapping.return_value = MOCK_WORKCENTER_MAPPING
mock_sql_slow.return_value = _make_material_df(3)
forward_query("workorder", ["WO-001"], page=1, per_page=50)
mock_gc.assert_called_once()
@patch(_PATCH_GC)
@patch(_PATCH_REDIS_STORE)
@patch(_PATCH_REDIS_LOAD, return_value=None)
@patch("mes_dashboard.services.material_trace_service.read_sql_df_slow")
@patch("mes_dashboard.services.material_trace_service.get_workcenter_mapping")
def test_reverse_query_calls_gc(self, mock_mapping, mock_sql_slow, _rl, _rs, mock_gc):
mock_mapping.return_value = MOCK_WORKCENTER_MAPPING
mock_sql_slow.return_value = _make_material_df(3)
reverse_query(["MLOT-A"], page=1, per_page=50)
mock_gc.assert_called_once()

View File

@@ -10,6 +10,7 @@ from unittest.mock import MagicMock
import pandas as pd
import pytest
from mes_dashboard.core import interactive_memory_guard as _img
from mes_dashboard.services import reject_dataset_cache as cache_svc
@@ -493,7 +494,7 @@ def test_compute_batch_pareto_memory_guard_rejects_large_cached_dataset(monkeypa
df = _build_detail_filter_df()
monkeypatch.setattr(cache_svc, "_get_cached_df", lambda _query_id: df)
monkeypatch.setattr(cache_svc, "_df_memory_mb", lambda _df: 128.0)
monkeypatch.setattr(_img, "df_memory_mb", lambda _df: 128.0)
monkeypatch.setattr(cache_svc, "_REJECT_DERIVE_MAX_INPUT_MB", 64)
with pytest.raises(MemoryError, match="超過 64 MB 上限"):
@@ -509,8 +510,8 @@ def test_compute_batch_pareto_memory_guard_allows_after_filter_narrowing(monkeyp
monkeypatch.setattr(cache_svc, "_get_cached_df", lambda _query_id: df)
monkeypatch.setattr(
cache_svc,
"_df_memory_mb",
_img,
"df_memory_mb",
lambda frame: 128.0 if len(frame.index) > 1 else 16.0,
)
monkeypatch.setattr(cache_svc, "_REJECT_DERIVE_MAX_INPUT_MB", 64)
@@ -547,7 +548,7 @@ def test_compute_batch_pareto_memory_guard_uses_compacted_pareto_frame(monkeypat
)
return 128.0 if has_object_dim else 16.0
monkeypatch.setattr(cache_svc, "_df_memory_mb", fake_df_memory_mb)
monkeypatch.setattr(_img, "df_memory_mb", fake_df_memory_mb)
monkeypatch.setattr(cache_svc, "_REJECT_DERIVE_MAX_INPUT_MB", 64)
result = cache_svc.compute_batch_pareto(

View File

@@ -15,15 +15,20 @@ import mes_dashboard.services.trace_job_service as tjs
# is_async_available
# ---------------------------------------------------------------------------
def test_is_async_available_true():
"""Should return True when rq is importable and Redis is up."""
"""Should return True when rq is importable, Redis is up, and workers exist."""
tjs._RQ_AVAILABLE = None # reset cached flag
with patch.object(tjs, "get_redis_client", return_value=MagicMock()):
tjs._rq_health_cache["available"] = None # reset health cache
mock_conn = MagicMock()
with patch.object(tjs, "get_redis_client", return_value=mock_conn), \
patch("rq.Worker") as mock_worker_cls:
mock_worker_cls.all.return_value = [MagicMock()] # simulate one worker
assert tjs.is_async_available() is True
def test_is_async_available_false_no_redis():
"""Should return False when Redis is unavailable."""
tjs._RQ_AVAILABLE = True
tjs._rq_health_cache["available"] = None # reset health cache
with patch.object(tjs, "get_redis_client", return_value=None):
assert tjs.is_async_available() is False