feat(trace-pipeline): memory triage, async job queue, and NDJSON streaming

Three proposals addressing the 2026-02-25 trace pipeline OOM crash (114K CIDs):

1. trace-events-memory-triage: fetchmany iterator (read_sql_df_slow_iter),
   admission control (50K CID limit for non-MSD), cache skip for large queries,
   early memory release with gc.collect()

2. trace-async-job-queue: RQ-based async jobs for queries >20K CIDs,
   separate worker process with isolated memory, frontend polling via
   useTraceProgress composable, systemd service + deploy scripts

3. trace-streaming-response: chunked Redis storage (TRACE_STREAM_BATCH_SIZE=5000),
   NDJSON stream endpoint (GET /api/trace/job/<id>/stream), frontend
   ReadableStream consumer for progressive rendering, backward-compatible
   with legacy single-key storage

All three proposals archived. 1101 tests pass, frontend builds clean.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
egg
2026-02-25 21:01:27 +08:00
parent cbb943dfe5
commit dbe0da057c
32 changed files with 3140 additions and 87 deletions

View File

@@ -85,7 +85,9 @@ LOCAL_AUTH_PASSWORD=
# Server bind address and port
GUNICORN_BIND=0.0.0.0:8080
# Number of worker processes (recommend: 2 * CPU cores + 1)
# Number of worker processes
# Recommend: 2 for ≤ 8GB RAM (trace queries consume 2-3 GB peak per worker)
# Recommend: 4 for ≥ 16GB RAM
GUNICORN_WORKERS=2
# Threads per worker
@@ -168,14 +170,55 @@ CIRCUIT_BREAKER_WINDOW_SIZE=10
TRACE_SLOW_THRESHOLD_SECONDS=15
# Max parallel workers for events domain fetching (per request)
TRACE_EVENTS_MAX_WORKERS=4
# Recommend: 2 (each worker × EVENT_FETCHER_MAX_WORKERS = peak slow query slots)
TRACE_EVENTS_MAX_WORKERS=2
# Max parallel workers for EventFetcher batch queries (per domain)
EVENT_FETCHER_MAX_WORKERS=4
# Recommend: 2 (peak concurrent slow queries = TRACE_EVENTS_MAX_WORKERS × this)
EVENT_FETCHER_MAX_WORKERS=2
# Max parallel workers for forward pipeline WIP+rejects fetching
FORWARD_PIPELINE_MAX_WORKERS=2
# --- Admission Control (提案 1: trace-events-memory-triage) ---
# Max container IDs per synchronous events request.
# Requests exceeding this limit return HTTP 413 (or HTTP 202 when async job queue is enabled).
# Set based on available RAM: 50K CIDs ≈ 2-3 GB peak memory per request.
TRACE_EVENTS_CID_LIMIT=50000
# Cursor fetchmany batch size for slow query iterator mode.
# Smaller = less peak memory; larger = fewer Oracle round-trips.
DB_SLOW_FETCHMANY_SIZE=5000
# Domain-level cache skip threshold (CID count).
# When CID count exceeds this, per-domain and route-level cache writes are skipped.
EVENT_FETCHER_CACHE_SKIP_CID_THRESHOLD=10000
# --- Async Job Queue (提案 2: trace-async-job-queue) ---
# Enable RQ trace worker for async large query processing
# Set to true and start the worker: rq worker trace-events
TRACE_WORKER_ENABLED=false
# CID threshold for automatic async job routing (requires RQ worker).
# Requests with CID count > threshold are queued instead of processed synchronously.
TRACE_ASYNC_CID_THRESHOLD=20000
# Job result retention time in seconds (default: 3600 = 1 hour)
TRACE_JOB_TTL_SECONDS=3600
# Job execution timeout in seconds (default: 1800 = 30 minutes)
TRACE_JOB_TIMEOUT_SECONDS=1800
# Number of RQ worker processes for trace jobs
TRACE_WORKER_COUNT=1
# RQ queue name for trace jobs
TRACE_WORKER_QUEUE=trace-events
# --- Streaming Response (提案 3: trace-streaming-response) ---
# NDJSON stream batch size (records per NDJSON line)
TRACE_STREAM_BATCH_SIZE=5000
# ============================================================
# Performance Metrics Configuration
# ============================================================

View File

@@ -0,0 +1,37 @@
[Unit]
Description=MES Dashboard Trace Worker (RQ, Conda Runtime)
Documentation=https://github.com/your-org/mes-dashboard
After=network.target redis-server.service
Requires=redis-server.service
[Service]
Type=simple
User=www-data
Group=www-data
WorkingDirectory=/opt/mes-dashboard
EnvironmentFile=-/opt/mes-dashboard/.env
Environment="PYTHONPATH=/opt/mes-dashboard/src"
ExecStart=/usr/bin/env bash -lc 'exec "${CONDA_BIN:-/opt/miniconda3/bin/conda}" run --no-capture-output -n "${CONDA_ENV_NAME:-mes-dashboard}" rq worker "${TRACE_WORKER_QUEUE:-trace-events}" --url "${REDIS_URL:-redis://localhost:6379/0}"'
KillSignal=SIGTERM
TimeoutStopSec=60
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal
SyslogIdentifier=mes-dashboard-trace-worker
# Memory protection: trace worker handles large queries independently.
# MemoryMax prevents single large job from killing the VM.
MemoryHigh=3G
MemoryMax=4G
NoNewPrivileges=yes
PrivateTmp=yes
ProtectSystem=strict
ReadWritePaths=/opt/mes-dashboard/logs
[Install]
WantedBy=multi-user.target

View File

@@ -27,6 +27,14 @@ StandardOutput=journal
StandardError=journal
SyslogIdentifier=mes-dashboard
# Memory protection (cgroup v2): prevents OOM from killing entire VM.
# MemoryHigh: soft limit — kernel starts reclaiming when exceeded (service stays alive).
# MemoryMax: hard limit — OOM kills only this service (host OS survives).
# Adjust based on VM RAM: MemoryHigh ≈ 70% of VM RAM, MemoryMax ≈ 85% of VM RAM.
# For 7GB VM: MemoryHigh=5G, MemoryMax=6G (leaves ~1GB for OS + Redis).
MemoryHigh=5G
MemoryMax=6G
NoNewPrivileges=yes
PrivateTmp=yes
ProtectSystem=strict

View File

@@ -34,6 +34,9 @@ dependencies:
- redis>=5.0.0,<6.0.0
- hiredis>=2.0.0,<4.0.0 # C parser for better performance
# Task Queue (async trace jobs)
- rq>=1.16.0,<2.0.0
# HTTP Client
- requests>=2.28.0,<3.0.0

View File

@@ -1,10 +1,12 @@
import { reactive, ref } from 'vue';
import { apiPost, ensureMesApiAvailable } from '../core/api.js';
import { apiGet, apiPost, ensureMesApiAvailable } from '../core/api.js';
ensureMesApiAvailable();
const DEFAULT_STAGE_TIMEOUT_MS = 360000;
const JOB_POLL_INTERVAL_MS = 3000;
const JOB_POLL_MAX_MS = 1800000; // 30 minutes
const PROFILE_DOMAINS = Object.freeze({
query_tool: ['history', 'materials', 'rejects', 'holds', 'jobs'],
mid_section_defect: ['upstream_history', 'materials'],
@@ -71,6 +73,107 @@ function collectAllContainerIds(seedContainerIds, lineagePayload, direction) {
return merged;
}
function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
/**
* Poll an async trace job until it completes or fails.
*
* @param {string} statusUrl - The job status endpoint URL
* @param {object} options - { signal, onProgress }
* @returns {Promise<string>} The final status ('finished' or throws)
*/
async function pollJobUntilComplete(statusUrl, { signal, onProgress } = {}) {
const started = Date.now();
while (true) {
if (signal?.aborted) {
throw new DOMException('Aborted', 'AbortError');
}
const status = await apiGet(statusUrl, { timeout: 15000, signal });
if (typeof onProgress === 'function') {
onProgress(status);
}
if (status.status === 'finished') {
return 'finished';
}
if (status.status === 'failed') {
const error = new Error(status.error || '非同步查詢失敗');
error.errorCode = 'JOB_FAILED';
throw error;
}
if (Date.now() - started > JOB_POLL_MAX_MS) {
const error = new Error('非同步查詢超時');
error.errorCode = 'JOB_POLL_TIMEOUT';
throw error;
}
await sleep(JOB_POLL_INTERVAL_MS);
}
}
/**
* Consume an NDJSON stream from the server, calling onChunk for each line.
*
* @param {string} url - The stream endpoint URL
* @param {object} options - { signal, onChunk }
* @returns {Promise<void>}
*/
async function consumeNDJSONStream(url, { signal, onChunk } = {}) {
const response = await fetch(url, { signal });
if (!response.ok) {
const text = await response.text().catch(() => '');
const error = new Error(`Stream request failed: HTTP ${response.status} ${text}`);
error.errorCode = 'STREAM_FAILED';
throw error;
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop(); // keep incomplete last line
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed) continue;
try {
const chunk = JSON.parse(trimmed);
if (typeof onChunk === 'function') onChunk(chunk);
} catch {
// skip malformed NDJSON lines
}
}
}
// process remaining buffer
if (buffer.trim()) {
try {
const chunk = JSON.parse(buffer.trim());
if (typeof onChunk === 'function') onChunk(chunk);
} catch {
// skip malformed final line
}
}
} finally {
reader.releaseLock();
}
}
export function useTraceProgress({ profile } = {}) {
const current_stage = ref(null);
const completed_stages = ref([]);
@@ -88,6 +191,15 @@ export function useTraceProgress({ profile } = {}) {
events: null,
});
// Async job progress (populated when events stage uses async path)
const job_progress = reactive({
active: false,
job_id: null,
status: null,
elapsed_seconds: 0,
progress: '',
});
let activeController = null;
function reset() {
@@ -99,6 +211,11 @@ export function useTraceProgress({ profile } = {}) {
stage_errors.seed = null;
stage_errors.lineage = null;
stage_errors.events = null;
job_progress.active = false;
job_progress.job_id = null;
job_progress.status = null;
job_progress.elapsed_seconds = 0;
job_progress.progress = '';
}
function abort() {
@@ -173,7 +290,70 @@ export function useTraceProgress({ profile } = {}) {
},
{ timeout: DEFAULT_STAGE_TIMEOUT_MS, signal: controller.signal },
);
// Async path: server returned 202 with job_id
if (eventsPayload?.async === true && eventsPayload?.status_url) {
job_progress.active = true;
job_progress.job_id = eventsPayload.job_id;
job_progress.status = 'queued';
// Phase 1: poll until job finishes
await pollJobUntilComplete(eventsPayload.status_url, {
signal: controller.signal,
onProgress: (status) => {
job_progress.status = status.status;
job_progress.elapsed_seconds = status.elapsed_seconds || 0;
job_progress.progress = status.progress || '';
},
});
// Phase 2: stream result via NDJSON (or fall back to full result)
const streamUrl = eventsPayload.stream_url;
if (streamUrl) {
job_progress.progress = 'streaming';
const streamedResult = { stage: 'events', results: {}, aggregation: null };
let totalRecords = 0;
await consumeNDJSONStream(streamUrl, {
signal: controller.signal,
onChunk: (chunk) => {
if (chunk.type === 'domain_start') {
streamedResult.results[chunk.domain] = { data: [], count: 0, total: chunk.total };
} else if (chunk.type === 'records' && streamedResult.results[chunk.domain]) {
const domainResult = streamedResult.results[chunk.domain];
domainResult.data.push(...chunk.data);
domainResult.count = domainResult.data.length;
totalRecords += chunk.data.length;
job_progress.progress = `streaming: ${totalRecords} records`;
} else if (chunk.type === 'aggregation') {
streamedResult.aggregation = chunk.data;
} else if (chunk.type === 'warning') {
streamedResult.error = chunk.code;
streamedResult.failed_domains = chunk.failed_domains;
} else if (chunk.type === 'full_result') {
// Legacy fallback: server sent full result as single chunk
Object.assign(streamedResult, chunk.data);
}
},
});
stage_results.events = streamedResult;
} else {
// No stream_url: fall back to fetching full result
const resultUrl = `${eventsPayload.status_url}/result`;
stage_results.events = await apiGet(resultUrl, {
timeout: DEFAULT_STAGE_TIMEOUT_MS,
signal: controller.signal,
});
}
job_progress.active = false;
} else {
// Sync path
stage_results.events = eventsPayload;
}
completed_stages.value = [...completed_stages.value, 'events'];
return stage_results;
} catch (error) {
@@ -192,6 +372,7 @@ export function useTraceProgress({ profile } = {}) {
}
current_stage.value = null;
is_running.value = false;
job_progress.active = false;
}
}
@@ -200,6 +381,7 @@ export function useTraceProgress({ profile } = {}) {
completed_stages,
stage_results,
stage_errors,
job_progress,
is_running,
execute,
reset,

View File

@@ -0,0 +1,2 @@
schema: spec-driven
created: 2026-02-25

View File

@@ -0,0 +1,149 @@
## Context
提案 1trace-events-memory-triage解決了峰值記憶體問題並加入 admission control
但 CID > 50K 的查詢被直接拒絕HTTP 413
使用者仍有合理需求查詢大範圍資料(例如 TMTT 站 5 個月 = 114K CIDs
目前 codebase 完全沒有非同步任務基礎設施(無 Celery、RQ、Dramatiq
所有操作都是同步 request-response受 gunicorn 360s timeout 硬限。
需要引入輕量級 job queue讓大查詢在獨立 worker 進程中執行,
不佔 gunicorn thread、不受 360s timeout 限制、失敗可重試。
## Goals / Non-Goals
**Goals:**
- CID > 閾值的 trace events 查詢改走非同步 jobAPI 回 202 + job_id
- 獨立 worker 進程systemd unit不佔 gunicorn 資源
- Job 狀態可查詢queued/running/completed/failed
- 結果有 TTL 自動清理,不佔 Redis 長期記憶體
- 前端自動判斷同步/非同步路徑,顯示 job 進度
- 最小新依賴(利用既有 Redis
**Non-Goals:**
- 不做通用 task queue只處理 trace events
- 不做 job 重試(大查詢重試消耗巨大,失敗後使用者手動重新觸發)
- 不做 job 取消Oracle 查詢一旦發出難以取消)
- 不做 job 持久化到 DBRedis TTL 足夠)
- 不修改 lineage 階段(仍然同步,通常 < 120s
## Decisions
### D1: RQRedis Queue而非 Celery/Dramatiq
**決策**使用 RQ 作為 job queue
**理由**
- 專案已有 Redis零額外基礎設施
- RQ Celery 輕量 10 broker 中間層 beat scheduler flower
- RQ worker 是獨立 Python 進程記憶體隔離
- API 簡單`queue.enqueue(func, args, job_timeout=600, result_ttl=3600)`
- 社群活躍Flask 生態整合良好
**替代方案**
- Celery過重專案不需要 beatchordchain 等功能 rejected
- Dramatiq更輕量但社群較小Redis broker 整合不如 RQ 成熟 rejected
- 自製 threading前面討論已排除worker 生命週期記憶體競爭)→ rejected
### D2: 同步/非同步分界閾值
**決策**
| CID 數量 | 行為 |
|-----------|------|
| 20,000 | 同步處理現有 events endpoint |
| 20,001 ~ 50,000 | 非同步 job 202 + job_id |
| > 50,000 | 非同步 job回 202 + job_idworker 內部分段處理 |
**env var**`TRACE_ASYNC_CID_THRESHOLD`(預設 20000
**理由**
- ≤ 20K CIDs 的 events 查詢通常在 60s 內完成,同步足夠
- 20K-50K 需要 2-5 分鐘,超出使用者耐心且佔住 gunicorn thread
- > 50K 是提案 1 的 admission control 上限,必須走非同步
**提案 1 的 HTTP 413 改為 HTTP 202**
當提案 2 實作完成後,提案 1 的 `TRACE_EVENTS_CID_LIMIT` 檢查改為自動 fallback 到 async job
不再拒絕請求。
### D3: Job 狀態與結果儲存
**決策**:使用 RQ 內建的 job 狀態追蹤(儲存在 Redis
```
Job lifecycle:
queued → started → finished / failed
Redis keys:
rq:job:{job_id} # RQ 內建 job metadata
trace:job:{job_id}:meta # 自訂 metadataprofile, cid_count, domains, progress
trace:job:{job_id}:result # 完成後的結果JSON設 TTL
```
**env vars**
- `TRACE_JOB_TTL_SECONDS`:結果保留時間(預設 3600 = 1 小時)
- `TRACE_JOB_TIMEOUT_SECONDS`:單一 job 最大執行時間(預設 1800 = 30 分鐘)
### D4: API 設計
```
POST /api/trace/events ← 現有CID ≤ 閾值時同步
POST /api/trace/events ← CID > 閾值時回 202 + job_id同一 endpoint
GET /api/trace/job/{job_id} ← 查詢 job 狀態
GET /api/trace/job/{job_id}/result ← 取得完整結果
GET /api/trace/job/{job_id}/result?domain=history&offset=0&limit=5000 ← 分頁取結果
```
**202 回應格式**
```json
{
"stage": "events",
"async": true,
"job_id": "trace-evt-abc123",
"status_url": "/api/trace/job/trace-evt-abc123",
"estimated_seconds": 300
}
```
### D5: Worker 部署架構
```
systemd (mes-dashboard-trace-worker.service)
→ conda run -n mes-dashboard rq worker trace-events --with-scheduler
→ 獨立進程,獨立記憶體空間
→ MemoryMax=4Gcgroup 保護)
```
**env vars**
- `TRACE_WORKER_COUNT`worker 進程數(預設 1
- `TRACE_WORKER_QUEUE`queue 名稱(預設 `trace-events`
### D6: 前端整合
`useTraceProgress.js` 修改:
```javascript
// events 階段
const eventsResp = await fetchStage('events', payload)
if (eventsResp.status === 202) {
// 非同步路徑
const { job_id, status_url } = eventsResp.data
return await pollJobUntilComplete(status_url, {
onProgress: (status) => updateProgress('events', status.progress),
pollInterval: 3000,
maxPollTime: 1800000, // 30 分鐘
})
}
// 同步路徑(現有)
return eventsResp.data
```
## Risks / Trade-offs
| 風險 | 緩解措施 |
|------|---------|
| RQ 新依賴增加維護成本 | RQ 穩定、API 簡單、只用核心功能 |
| Worker 進程增加記憶體使用 | 獨立 cgroup MemoryMax=4G空閒時幾乎不佔記憶體 |
| Redis 儲存大結果影響效能 | 結果 TTL=1h 自動清理;配合提案 3 串流取代全量儲存 |
| Worker crash 丟失進行中 job | RQ 內建 failed job registry使用者可手動重觸發 |
| 前端輪詢增加 API 負載 | pollInterval=3s只有 active job 才輪詢 |

View File

@@ -0,0 +1,45 @@
## Why
trace pipeline 處理大量 CIDs> 20K即使經過分批處理優化提案 1
仍然面臨以下根本問題:
1. **同步 request-response 模型**gunicorn 360s timeout 是硬限lineage + events 合計可能超過 300s
2. **worker thread 被佔住**:大查詢期間 1 個 gunicorn thread 完全被佔用,降低即時頁服務能力
3. **前端無進度回饋**:使用者只能盯著 loading spinner 等 5-6 分鐘,不知道是否正常運作
4. **失敗後需完全重新執行**:中途 timeout/OOM 後,已完成的 seed-resolve 和 lineage 結果全部浪費
業界標準做法是將長時間任務放入非同步佇列RQ/DramatiqAPI 先回 202 + job_id
背景 worker 獨立處理,前端輪詢或 SSE 取得結果。
## What Changes
- **引入 RQRedis Queue**:利用既有 Redis 基礎設施,最小化新依賴
- **新增 trace job worker**獨立進程systemd unit不佔 gunicorn worker 資源
- **新增 `POST /api/trace/events-async`**CID > 閾值時回 202 + job_id
- **新增 `GET /api/trace/job/{job_id}`**:輪詢 job 狀態queued/running/completed/failed
- **新增 `GET /api/trace/job/{job_id}/result`**:取得完成後的結果(分頁)
- **前端 useTraceProgress 整合**:自動判斷同步/非同步路徑,顯示 job 進度
- **Job TTL + 自動清理**:結果保留 1 小時後自動過期
- **新增 systemd unit**`mes-dashboard-trace-worker.service`
- **更新 .env.example**`TRACE_ASYNC_CID_THRESHOLD``TRACE_JOB_TTL_SECONDS``TRACE_WORKER_COUNT`
## Capabilities
### New Capabilities
- `trace-async-job`: 非同步 trace job 佇列RQ + Redis
### Modified Capabilities
- `trace-staged-api`: events endpoint 整合 async job 路由
- `progressive-trace-ux`: 前端整合 job 輪詢 + 進度顯示
## Impact
- **新增依賴**`rq>=1.16.0,<2.0.0`requirements.txt、environment.yml
- **後端新增**trace_job_service.py、trace_routes.pyasync endpoints
- **前端修改**useTraceProgress.jsasync 整合)
- **部署新增**deploy/mes-dashboard-trace-worker.service、scripts/start_server.shworker 管理)
- **部署設定**.env.example新 env vars
- **不影響**:其他 service、即時監控頁、admin 頁面
- **前置條件**trace-events-memory-triage提案 1

View File

@@ -0,0 +1,47 @@
## ADDED Requirements
### Requirement: Trace events endpoint SHALL support asynchronous job execution
The `/api/trace/events` endpoint SHALL automatically route large CID requests to an async job queue.
#### Scenario: CID count exceeds async threshold
- **WHEN** the events endpoint receives a request with `container_ids` count exceeding `TRACE_ASYNC_CID_THRESHOLD` (env: `TRACE_ASYNC_CID_THRESHOLD`, default: 20000)
- **THEN** the endpoint SHALL enqueue the request to the `trace-events` RQ queue
- **THEN** the endpoint SHALL return HTTP 202 with `{ "stage": "events", "async": true, "job_id": "...", "status_url": "/api/trace/job/{job_id}" }`
#### Scenario: CID count within sync threshold
- **WHEN** the events endpoint receives a request with `container_ids` count ≤ `TRACE_ASYNC_CID_THRESHOLD`
- **THEN** the endpoint SHALL process synchronously as before
### Requirement: Trace API SHALL expose job status endpoint
`GET /api/trace/job/{job_id}` SHALL return the current status of an async trace job.
#### Scenario: Job status query
- **WHEN** a client queries job status with a valid job_id
- **THEN** the endpoint SHALL return `{ "job_id": "...", "status": "queued|started|finished|failed", "progress": {...}, "created_at": "...", "elapsed_seconds": N }`
#### Scenario: Job not found
- **WHEN** a client queries job status with an unknown or expired job_id
- **THEN** the endpoint SHALL return HTTP 404 with `{ "error": "...", "code": "JOB_NOT_FOUND" }`
### Requirement: Trace API SHALL expose job result endpoint
`GET /api/trace/job/{job_id}/result` SHALL return the result of a completed async trace job.
#### Scenario: Completed job result
- **WHEN** a client requests result for a completed job
- **THEN** the endpoint SHALL return the same response format as the synchronous events endpoint
- **THEN** optional query params `domain`, `offset`, `limit` SHALL support pagination
#### Scenario: Job not yet completed
- **WHEN** a client requests result for a non-completed job
- **THEN** the endpoint SHALL return HTTP 409 with `{ "error": "...", "code": "JOB_NOT_COMPLETE", "status": "queued|started" }`
### Requirement: Async trace jobs SHALL have TTL and timeout
Job results SHALL expire after a configurable TTL, and execution SHALL be bounded by a timeout.
#### Scenario: Job result TTL
- **WHEN** a trace job completes (success or failure)
- **THEN** the result SHALL be stored in Redis with TTL = `TRACE_JOB_TTL_SECONDS` (env, default: 3600)
#### Scenario: Job execution timeout
- **WHEN** a trace job exceeds `TRACE_JOB_TIMEOUT_SECONDS` (env, default: 1800)
- **THEN** RQ SHALL terminate the job and mark it as failed

View File

@@ -0,0 +1,39 @@
## 1. Dependencies
- [x] 1.1 Add `rq>=1.16.0,<2.0.0` to `requirements.txt`
- [x] 1.2 Add `rq>=1.16.0,<2.0.0` to pip dependencies in `environment.yml`
## 2. Trace Job Service
- [x] 2.1 Create `src/mes_dashboard/services/trace_job_service.py` with `enqueue_trace_events_job()`, `get_job_status()`, `get_job_result()`
- [x] 2.2 Implement `execute_trace_events_job()` function (RQ worker entry point): runs EventFetcher + optional MSD aggregation, stores result in Redis with TTL
- [x] 2.3 Add job metadata tracking: `trace:job:{job_id}:meta` Redis key with `{profile, cid_count, domains, status, progress, created_at, completed_at}`
- [x] 2.4 Add unit tests for trace_job_service (13 tests: enqueue, status, result, worker execution, flatten)
## 3. Async API Endpoints
- [x] 3.1 Modify `events()` in `trace_routes.py`: when `len(container_ids) > TRACE_ASYNC_CID_THRESHOLD` and async available, call `enqueue_trace_events_job()` and return HTTP 202
- [x] 3.2 Add `GET /api/trace/job/<job_id>` endpoint: return job status from `get_job_status()`
- [x] 3.3 Add `GET /api/trace/job/<job_id>/result` endpoint: return job result from `get_job_result()` with optional `domain`, `offset`, `limit` query params
- [x] 3.4 Add rate limiting to job status/result endpoints (60 req/60s)
- [x] 3.5 Add unit tests for async endpoints (8 tests: async routing, sync fallback, 413 fallback, job status/result)
## 4. Deployment
- [x] 4.1 Create `deploy/mes-dashboard-trace-worker.service` systemd unit (MemoryHigh=3G, MemoryMax=4G)
- [x] 4.2 Update `scripts/start_server.sh`: add `start_rq_worker`/`stop_rq_worker`/`rq_worker_status` functions
- [x] 4.3 Update `scripts/deploy.sh`: add trace worker systemd install instructions
- [x] 4.4 Update `.env.example`: uncomment and add `TRACE_WORKER_ENABLED`, `TRACE_ASYNC_CID_THRESHOLD`, `TRACE_JOB_TTL_SECONDS`, `TRACE_JOB_TIMEOUT_SECONDS`, `TRACE_WORKER_COUNT`, `TRACE_WORKER_QUEUE`
## 5. Frontend Integration
- [x] 5.1 Modify `useTraceProgress.js`: detect async response (`eventsPayload.async === true`), switch to job polling mode
- [x] 5.2 Add `pollJobUntilComplete()` helper: poll `GET /api/trace/job/{job_id}` every 3s, max 30 minutes
- [x] 5.3 Add `job_progress` reactive state for UI: `{ active, job_id, status, elapsed_seconds, progress }`
- [x] 5.4 Add error handling: job failed (`JOB_FAILED`), polling timeout (`JOB_POLL_TIMEOUT`), abort support
## 6. Verification
- [x] 6.1 Run `python -m pytest tests/ -v` — 1090 passed, 152 skipped
- [x] 6.2 Run `cd frontend && npm run build` — frontend builds successfully
- [x] 6.3 Verify rq installed: `python -c "import rq; print(rq.VERSION)"` → 1.16.2

View File

@@ -0,0 +1,174 @@
## Context
2026-02-25 生產環境 OOM crash 時間線:
```
13:18:15 seed-resolve (read_sql_df_slow): 525K rows → 70K lots (38.95s)
13:20:12 lineage (read_sql_df_slow): 114K CIDs, 54MB JSON (65s)
13:20:16 events (read_sql_df_slow): 2 domains × 115 batches × 2 workers
13:20:16 cursor.fetchall() 開始累積 rows → DataFrame → dict → grouped
每個 domain 同時持有 ~3 份完整資料副本
峰值記憶體: (fetchall rows + DataFrame + grouped dict) × 2 domains ≈ 4-6 GB
13:37:47 OOM SIGKILL — 7GB VM, 0 swap
```
pool 隔離(前一個 change解決了連線互搶問題但 events 階段的記憶體使用才是 OOM 根因。
目前 `read_sql_df_slow` 使用 `cursor.fetchall()` 一次載入全部結果到 Python list
然後建 `pd.DataFrame`,再 `iterrows()` + `to_dict()` 轉成 dict list。
114K CIDs 的 upstream_history domain 可能回傳 100 萬+ rows
每份副本數百 MB3-4 份同時存在就超過 VM 記憶體。
## Goals / Non-Goals
**Goals:**
- 防止大查詢直接 OOM 殺死整台 VMadmission control
- 降低 events 階段峰值記憶體 60-70%fetchmany + 跳過 DataFrame
- 保護 host OS 穩定性systemd MemoryMax + workers 降為 2
- 不改變現有 API 回傳格式(對前端透明)
- 更新部署文件和 env 設定
**Non-Goals:**
- 不引入非同步任務佇列(提案 2 範圍)
- 不修改 lineage 階段54MB 在可接受範圍)
- 不修改前端(提案 3 範圍)
- 不限制使用者查詢範圍(日期/站別由使用者決定)
## Decisions
### D1: Admission Control 閾值與行為
**決策**:在 trace events endpoint 加入 CID 數量上限判斷,**依 profile 區分**。
| Profile | CID 數量 | 行為 |
|---------|-----------|------|
| `query_tool` / `query_tool_reverse` | ≤ 50,000 | 正常同步處理 |
| `query_tool` / `query_tool_reverse` | > 50,000 | 回 HTTP 413實務上不會發生 |
| `mid_section_defect` | 任意 | **不設硬限**,正常處理 + log warningCID > 50K 時) |
**env var**`TRACE_EVENTS_CID_LIMIT`(預設 50000僅對非 MSD profile 生效)
**MSD 不設硬限的理由**
- MSD 報廢追溯是聚合統計pareto/表格不渲染追溯圖CID 數量多寡不影響可讀性
- 漏掉 CID 會導致報廢數量統計失準,資料完整性至關重要
- 114K CIDs 是真實業務場景TMTT 站 5 個月),不能拒絕
- OOM 風險由 systemd `MemoryMax=6G` 保護 host OSservice 被殺但 VM 存活,自動重啟)
- 提案 2 實作後MSD 大查詢自動走 async job記憶體問題根本解決
**query_tool 設 50K 上限的理由**
- 追溯圖超過數千節點已無法閱讀50K 是極寬鬆的安全閥
- 實務上 query_tool seed 通常 1-50 lots → lineage 後幾百到幾千 CIDs
**替代方案**
- 全 profile 統一上限 → MSD 被擋住,報廢統計不完整 → rejected
- 無上限 + 只靠 fetchmany → MSD 接受此風險(有 MemoryMax 保護)→ adopted for MSD
- 上限設太低(如 10K→ 影響正常 MSD 查詢(通常 5K-30K CIDs→ rejected
### D2: fetchmany 取代 fetchall
**決策**`read_sql_df_slow` 新增 `fetchmany` 模式,不建 DataFrame直接回傳 iterator。
```python
def read_sql_df_slow_iter(sql, params=None, timeout_seconds=None, batch_size=5000):
"""Yield batches of (columns, rows) without building DataFrame."""
# ... connect, execute ...
columns = [desc[0].upper() for desc in cursor.description]
while True:
rows = cursor.fetchmany(batch_size)
if not rows:
break
yield columns, rows
# ... cleanup in finally ...
```
**env var**`DB_SLOW_FETCHMANY_SIZE`(預設 5000
**理由**
- `fetchall()` 強制全量 materialization
- `fetchmany(5000)` 每次只持有 5000 rows 在記憶體
- 不建 DataFrame 省去 pandas overheadindex、dtype inference、NaN handling
- EventFetcher 可以 yield 完一批就 group 到結果 dict釋放 batch
**trade-off**
- `read_sql_df_slow`(回傳 DataFrame保留不動新增 `read_sql_df_slow_iter`
- 只有 EventFetcher 使用 iter 版本;其他 service 繼續用 DataFrame 版本
- 這樣不影響任何既有 consumer
### D3: EventFetcher 逐批 group 策略
**決策**`_fetch_batch` 改用 `read_sql_df_slow_iter`,每 fetchmany batch 立刻 group 到 `grouped` dict。
```python
def _fetch_batch(batch_ids):
builder = QueryBuilder()
builder.add_in_condition(filter_column, batch_ids)
sql = EventFetcher._build_domain_sql(domain, builder.get_conditions_sql())
for columns, rows in read_sql_df_slow_iter(sql, builder.params, timeout_seconds=60):
for row in rows:
record = dict(zip(columns, row))
# sanitize NaN
cid = record.get("CONTAINERID")
if cid:
grouped[cid].append(record)
# rows 離開 scope 即被 GC
```
**記憶體改善估算**
| 項目 | 修改前 | 修改後 |
|------|--------|--------|
| cursor buffer | 全量 (100K+ rows) | 5000 rows |
| DataFrame | 全量 | 無 |
| grouped dict | 全量(最終結果) | 全量(最終結果) |
| **峰值** | ~3x 全量 | ~1.05x 全量 |
grouped dict 仍然是全量,但省去了 fetchall list + DataFrame 的兩份副本。
對於 50K CIDs × 10 events = 500K records從 ~1.5GB 降到 ~500MB。
### D4: trace_routes 避免雙份持有
**決策**events endpoint 中 `raw_domain_results` 直接複用為 `results` 的來源,
`_flatten_domain_records` 在建完 flat list 後立刻 `del events_by_cid`
目前的問題:
```python
raw_domain_results[domain] = events_by_cid # 持有 reference
rows = _flatten_domain_records(events_by_cid) # 建新 list
results[domain] = {"data": rows, "count": len(rows)}
# → events_by_cid 和 rows 同時存在
```
修改後:
```python
events_by_cid = future.result()
rows = _flatten_domain_records(events_by_cid)
results[domain] = {"data": rows, "count": len(rows)}
if is_msd:
raw_domain_results[domain] = events_by_cid # MSD 需要 group-by-CID 結構
else:
del events_by_cid # 非 MSD 立刻釋放
```
### D5: Gunicorn workers 降為 2 + systemd MemoryMax
**決策**
- `.env.example``GUNICORN_WORKERS` 預設改為 2
- `deploy/mes-dashboard.service` 加入 `MemoryHigh=5G``MemoryMax=6G`
**理由**
- 4 workers × 大查詢 = 記憶體競爭嚴重
- 2 workers × 4 threads = 8 request threads足夠處理並行請求
- `MemoryHigh=5G`:超過後 kernel 開始 reclaim但不殺進程
- `MemoryMax=6G`:硬限,超過直接 OOM kill service保護 host OS
- 保留 1GB 給 OS + Redis + 其他服務
## Risks / Trade-offs
| 風險 | 緩解措施 |
|------|---------|
| 50K CID 上限可能擋住合理查詢 | env var 可調;提案 2 實作後改走 async |
| fetchmany iterator 模式下 cursor 持有時間更長 | timeout_seconds=60 限制semaphore 限制並行 |
| grouped dict 最終仍全量 | 這是 API contract需回傳所有結果提案 3 的 streaming 才能根本解決 |
| workers=2 降低並行處理能力 | 歷史頁查詢是 semaphore 限制的,降 workers 主要影響即時頁 throughput但即時頁很輕量 |
| MemoryMax kill service 會中斷所有在線使用者 | systemd Restart=always 自動重啟;比 host OS crash 好得多 |

View File

@@ -0,0 +1,39 @@
## Why
2026-02-25 生產環境 trace pipeline 處理 114K CIDsTMTT 站 + 5 個月日期範圍)時,
worker 被 OOM SIGKILL7GB VM無 swap。pool 隔離已完成,連線不再互搶,
但 events 階段的記憶體使用是真正瓶頸:
1. `cursor.fetchall()` 一次載入全部 rows數十萬筆
2. `pd.DataFrame(rows)` 複製一份
3. `df.iterrows()` + `row.to_dict()` 再一份
4. `grouped[cid].append(record)` 累積到最終 dict
5. `raw_domain_results[domain]` + `results[domain]["data"]` 在 trace_routes 同時持有雙份
114K CIDs × 2 domains峰值同時存在 3-4 份完整資料副本,每份數百 MB → 2-4 GB 單一 domain。
7GB VM4 workers完全無法承受。
## What Changes
- **Admission control**trace events endpoint 加 CID 數量上限判斷,超過閾值回 HTTP 413
- **分批處理**`read_sql_df_slow` 改用 `cursor.fetchmany()` 取代 `fetchall()`,不建 DataFrame
- **EventFetcher 逐批 group**:每批 fetch 完立刻 group 到結果 dict釋放 batch 記憶體
- **trace_routes 避免雙份持有**`raw_domain_results``results` 合併為單一資料結構
- **Gunicorn workers 降為 2**:降低單機記憶體競爭
- **systemd MemoryMax**:加 cgroup 記憶體保護,避免 OOM 殺死整台 VM
- **更新 .env.example**:新增 `TRACE_EVENTS_CID_LIMIT``DB_SLOW_FETCHMANY_SIZE` 等 env 文件
- **更新 deploy/mes-dashboard.service**:加入 `MemoryHigh``MemoryMax`
## Capabilities
### Modified Capabilities
- `trace-staged-api`: events endpoint 加入 admission controlCID 上限)
- `event-fetcher-unified`: 分批 group 記憶體優化,取消 DataFrame 中間層
## Impact
- **後端核心**database.pyfetchmany、event_fetcher.py逐批 group、trace_routes.pyadmission control + 記憶體管理)
- **部署設定**gunicorn.conf.py、.env.example、deploy/mes-dashboard.service
- **不影響**:前端、即時監控頁、其他 servicereject_history、hold_history 等)
- **前置條件**trace-pipeline-pool-isolation已完成

View File

@@ -0,0 +1,15 @@
## MODIFIED Requirements
### Requirement: EventFetcher SHALL use streaming fetch for batch queries
`EventFetcher._fetch_batch` SHALL use `read_sql_df_slow_iter` (fetchmany-based iterator) instead of `read_sql_df` (fetchall + DataFrame) to reduce peak memory usage.
#### Scenario: Batch query memory optimization
- **WHEN** EventFetcher executes a batch query for a domain
- **THEN** the query SHALL use `cursor.fetchmany(batch_size)` (env: `DB_SLOW_FETCHMANY_SIZE`, default: 5000) instead of `cursor.fetchall()`
- **THEN** rows SHALL be converted directly to dicts via `dict(zip(columns, row))` without building a DataFrame
- **THEN** each fetchmany batch SHALL be grouped into the result dict immediately, allowing the batch rows to be garbage collected
#### Scenario: Existing API contract preserved
- **WHEN** EventFetcher.fetch_events() returns results
- **THEN** the return type SHALL remain `Dict[str, List[Dict[str, Any]]]` (grouped by CONTAINERID)
- **THEN** the result SHALL be identical to the previous DataFrame-based implementation

View File

@@ -0,0 +1,19 @@
## MODIFIED Requirements
### Requirement: Trace events endpoint SHALL manage memory for large queries
The events endpoint SHALL proactively release memory after processing large CID sets.
#### Scenario: Admission control for non-MSD profiles
- **WHEN** the events endpoint receives a non-MSD profile request with `container_ids` count exceeding `TRACE_EVENTS_CID_LIMIT` (env: `TRACE_EVENTS_CID_LIMIT`, default: 50000)
- **THEN** the endpoint SHALL return HTTP 413 with `{ "error": "...", "code": "CID_LIMIT_EXCEEDED", "cid_count": N, "limit": M }`
- **THEN** Oracle DB connection pool SHALL NOT be consumed
#### Scenario: MSD profile bypasses CID hard limit
- **WHEN** the events endpoint receives a `mid_section_defect` profile request regardless of CID count
- **THEN** the endpoint SHALL proceed with normal processing (no CID hard limit)
- **THEN** if CID count exceeds 50000, the endpoint SHALL log a warning with `cid_count` for monitoring
#### Scenario: Non-MSD profile avoids double memory retention
- **WHEN** a non-MSD events request completes domain fetching
- **THEN** the `events_by_cid` reference SHALL be deleted immediately after `_flatten_domain_records`
- **THEN** only the flattened `results` dict SHALL remain in memory

View File

@@ -0,0 +1,37 @@
## 1. Admission Control (profile-aware)
- [x] 1.1 Add `TRACE_EVENTS_CID_LIMIT` env var (default 50000) to `trace_routes.py`
- [x] 1.2 Add CID count check in `events()` endpoint: for non-MSD profiles, if `len(container_ids) > TRACE_EVENTS_CID_LIMIT`, return HTTP 413 with `{ "code": "CID_LIMIT_EXCEEDED", "cid_count": N, "limit": M }`
- [x] 1.3 For MSD profile: bypass CID hard limit, log warning when CID count > 50000
- [x] 1.4 Add unit tests: non-MSD CID > limit → 413; MSD CID > limit → proceeds normally
## 2. Batch Fetch (fetchmany) in database.py
- [x] 2.1 Add `read_sql_df_slow_iter(sql, params, timeout_seconds, batch_size)` generator function to `database.py` that yields `(columns, rows)` tuples using `cursor.fetchmany(batch_size)`
- [x] 2.2 Add `DB_SLOW_FETCHMANY_SIZE` to `get_db_runtime_config()` (default 5000)
- [x] 2.3 Add unit test for `read_sql_df_slow_iter` (mock cursor, verify fetchmany calls and yields)
## 3. EventFetcher Memory Optimization
- [x] 3.1 Modify `_fetch_batch` in `event_fetcher.py` to use `read_sql_df_slow_iter` instead of `read_sql_df` — iterate rows directly, skip DataFrame, group to `grouped` dict immediately
- [x] 3.2 Update `_sanitize_record` to work with `dict(zip(columns, row))` instead of `row.to_dict()`
- [x] 3.3 Add unit test verifying EventFetcher uses `read_sql_df_slow_iter` import
- [x] 3.4 Update existing EventFetcher tests (mock `read_sql_df_slow_iter` instead of `read_sql_df`)
## 4. trace_routes Memory Optimization
- [x] 4.1 Modify events endpoint: only keep `raw_domain_results[domain]` for MSD profile; for non-MSD, `del events_by_cid` after flattening
- [x] 4.2 Verify existing `del raw_domain_results` and `gc.collect()` logic still correct after refactor
## 5. Deployment Configuration
- [x] 5.1 Update `.env.example`: add `TRACE_EVENTS_CID_LIMIT`, `DB_SLOW_FETCHMANY_SIZE` with descriptions
- [x] 5.2 Update `.env.example`: change `GUNICORN_WORKERS` default comment to recommend 2 for ≤ 8GB RAM
- [x] 5.3 Update `.env.example`: change `TRACE_EVENTS_MAX_WORKERS` and `EVENT_FETCHER_MAX_WORKERS` default to 2
- [x] 5.4 Update `deploy/mes-dashboard.service`: add `MemoryHigh=5G` and `MemoryMax=6G`
- [x] 5.5 Update `deploy/mes-dashboard.service`: add comment explaining memory limits
## 6. Verification
- [x] 6.1 Run `python -m pytest tests/ -v` — all existing tests pass (1069 passed, 152 skipped)
- [x] 6.2 Verify `.env.example` env var documentation is consistent with code defaults

View File

@@ -0,0 +1,2 @@
schema: spec-driven
created: 2026-02-25

View File

@@ -0,0 +1,140 @@
## Context
提案 2trace-async-job-queue讓大查詢在獨立 worker 中執行,
但結果仍然全量 materialize 到 Redisjob result和前端記憶體。
114K CIDs × 2 domains 的結果 JSON 可達 200-500MB
- Worker 記憶體grouped dict ~500MB + JSON serialize ~500MB = ~1GB 峰值
- RedisSETEX 500MB 的 key 耗時 5-10s阻塞其他操作
- 前端:瀏覽器解析 500MB JSON freeze UI 數十秒
串流回傳讓 server 逐批產生、前端逐批消費,記憶體使用只與每批大小成正比。
## Goals / Non-Goals
**Goals:**
- Job 結果以 NDJSON 串流回傳,避免全量 materialize
- EventFetcher 支援 iterator 模式,逐批 yield 結果
- 前端用 ReadableStream 逐行解析,逐批渲染
- 結果也支援分頁 API給不支援串流的 consumer 使用)
**Non-Goals:**
- 不改動同步路徑CID < 閾值仍走現有 JSON 回傳
- 不做 WebSocketNDJSON over HTTP 更簡單更通用
- 不做 Server-Sent EventsSSE 只支援 text/event-stream不適合大 payload
- 不修改 MSD aggregationaggregation 需要全量資料但結果較小
## Decisions
### D1: NDJSON 格式
**決策**使用 Newline Delimited JSONNDJSON作為串流格式
```
Content-Type: application/x-ndjson
{"type":"meta","job_id":"abc123","domains":["history","materials"],"cid_count":114892}
{"type":"domain_start","domain":"history","batch":1,"total_batches":23}
{"type":"records","domain":"history","batch":1,"data":[...5000 records...]}
{"type":"records","domain":"history","batch":2,"data":[...5000 records...]}
...
{"type":"domain_end","domain":"history","total_records":115000}
{"type":"domain_start","domain":"materials","batch":1,"total_batches":12}
...
{"type":"aggregation","data":{...}}
{"type":"complete","elapsed_seconds":285}
```
**env var**`TRACE_STREAM_BATCH_SIZE`預設 5000 records/batch
**理由**
- NDJSON 是業界標準串流 JSON 格式ElasticsearchBigQueryGitHub API 都用
- 每行是獨立 JSON前端可逐行 parse不需要等整個 response
- 5000 records/batch 2-5MB瀏覽器可即時渲染
- HTTP/1.1 chunked transfer 完美搭配
### D2: EventFetcher iterator 模式
**決策**新增 `fetch_events_iter()` 方法yield 每批 grouped records
```python
@staticmethod
def fetch_events_iter(container_ids, domain, batch_size=5000):
"""Yield dicts of {cid: [records]} in batches."""
# ... same SQL building logic ...
for oracle_batch_ids in batches:
for columns, rows in read_sql_df_slow_iter(sql, params):
batch_grouped = defaultdict(list)
for row in rows:
record = dict(zip(columns, row))
cid = record.get("CONTAINERID")
if cid:
batch_grouped[cid].append(record)
yield dict(batch_grouped)
```
**理由**
- `fetch_events()` 共存不影響同步路徑
- 每次 yield 只持有一個 fetchmany batch grouped 結果
- Worker 收到 yield 後立刻序列化寫出不累積
### D3: 結果分頁 API
**決策**提供 REST 分頁 API 作為 NDJSON 的替代方案
```
GET /api/trace/job/{job_id}/result?domain=history&offset=0&limit=5000
```
**回應格式**
```json
{
"domain": "history",
"offset": 0,
"limit": 5000,
"total": 115000,
"data": [... 5000 records ...]
}
```
**理由**
- 某些 consumer如外部系統不支援 NDJSON 串流
- 分頁 API 是標準 REST pattern
- 結果仍儲存在 Redis但按 domain key每個 key 5000 records 5MB
### D4: 前端 ReadableStream 消費
```javascript
async function consumeNDJSON(url, onChunk) {
const response = await fetch(url)
const reader = response.body.getReader()
const decoder = new TextDecoder()
let buffer = ''
while (true) {
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
const lines = buffer.split('\n')
buffer = lines.pop() // 保留不完整的最後一行
for (const line of lines) {
if (line.trim()) onChunk(JSON.parse(line))
}
}
}
```
**理由**
- ReadableStream 是瀏覽器原生 API無需額外依賴
- 逐行 parse 記憶體使用恆定只與 batch_size 成正比
- 可邊收邊渲染使用者體驗好
## Risks / Trade-offs
| 風險 | 緩解措施 |
|------|---------|
| NDJSON 不支援 HTTP 壓縮 | Flask 可配 gzip middleware每行 5000 records 壓縮率高 |
| 中途斷線需重新開始 | 分頁 API 可從斷點繼續取NDJSON 用於一次性消費 |
| 前端需要處理部分結果渲染 | 表格元件改用 virtual scroll既有 vue-virtual-scroller |
| MSD aggregation 仍需全量資料 | aggregation worker 內部完成只串流最終結果較小 |
| 結果按 domain key 增加 Redis key 數量 | TTL 清理 + key prefix 隔離 |

View File

@@ -0,0 +1,39 @@
## Why
即使有非同步 job提案 2處理大查詢結果 materialize 仍然是記憶體瓶頸:
1. **job result 全量 JSON**114K CIDs × 2 domains 的結果 JSON 可達數百 MB
Redis 儲存 + 讀取 + Flask jsonify 序列化,峰值記憶體仍高
2. **前端一次性解析**:瀏覽器解析數百 MB JSON 會 freeze UI
3. **Redis 單 key 限制**:大 value 影響 Redis 效能(阻塞其他操作)
串流回傳NDJSON/分頁)讓 server 逐批產生資料、前端逐批消費,
記憶體使用與 CID 總數解耦,只與每批大小成正比。
## What Changes
- **EventFetcher 支援 iterator 模式**`fetch_events_iter()` yield 每批結果而非累積全部
- **新增 `GET /api/trace/job/{job_id}/stream`**NDJSON 串流回傳 job 結果
- **前端 useTraceProgress 串流消費**:用 `fetch()` + `ReadableStream` 逐行解析 NDJSON
- **結果分頁 API**`GET /api/trace/job/{job_id}/result?domain=history&offset=0&limit=5000`
- **更新 .env.example**`TRACE_STREAM_BATCH_SIZE`
## Capabilities
### New Capabilities
- `trace-streaming-response`: NDJSON 串流回傳 + 結果分頁
### Modified Capabilities
- `event-fetcher-unified`: 新增 iterator 模式(`fetch_events_iter`
- `trace-staged-api`: job result 串流 endpoint
- `progressive-trace-ux`: 前端串流消費 + 逐批渲染
## Impact
- **後端核心**event_fetcher.pyiterator 模式、trace_routes.pystream endpoint
- **前端修改**useTraceProgress.jsReadableStream 消費)
- **部署設定**.env.example`TRACE_STREAM_BATCH_SIZE`
- **不影響**同步路徑CID < 閾值仍走現有流程)、其他 service即時監控頁
- **前置條件**trace-async-job-queue提案 2

View File

@@ -0,0 +1,14 @@
## ADDED Requirements
### Requirement: EventFetcher SHALL support iterator mode for streaming
`EventFetcher.fetch_events_iter()` SHALL yield batched results for streaming consumption.
#### Scenario: Iterator mode yields batches
- **WHEN** `fetch_events_iter(container_ids, domain, batch_size)` is called
- **THEN** it SHALL yield `Dict[str, List[Dict]]` batches (grouped by CONTAINERID)
- **THEN** each yielded batch SHALL contain results from one `cursor.fetchmany()` call
- **THEN** memory usage SHALL be proportional to `batch_size`, not total result count
#### Scenario: Iterator mode cache behavior
- **WHEN** `fetch_events_iter` is used for large CID sets (> CACHE_SKIP_CID_THRESHOLD)
- **THEN** per-domain cache SHALL be skipped (consistent with `fetch_events` behavior)

View File

@@ -0,0 +1,22 @@
## ADDED Requirements
### Requirement: Trace API SHALL expose NDJSON stream endpoint for job results
`GET /api/trace/job/{job_id}/stream` SHALL return job results as NDJSON (Newline Delimited JSON) stream.
#### Scenario: Stream completed job result
- **WHEN** a client requests stream for a completed job
- **THEN** the endpoint SHALL return `Content-Type: application/x-ndjson`
- **THEN** the response SHALL contain ordered NDJSON lines: `meta``domain_start``records` batches → `domain_end``aggregation` (if applicable) → `complete`
- **THEN** each `records` line SHALL contain at most `TRACE_STREAM_BATCH_SIZE` (env, default: 5000) records
#### Scenario: Stream for non-completed job
- **WHEN** a client requests stream for a non-completed job
- **THEN** the endpoint SHALL return HTTP 409 with `{ "error": "...", "code": "JOB_NOT_COMPLETE" }`
### Requirement: Job result pagination SHALL support domain-level offset/limit
`GET /api/trace/job/{job_id}/result` SHALL support fine-grained pagination per domain.
#### Scenario: Paginated domain result
- **WHEN** a client requests `?domain=history&offset=0&limit=5000`
- **THEN** the endpoint SHALL return only the specified slice of records for that domain
- **THEN** the response SHALL include `total` count for the domain

View File

@@ -0,0 +1,35 @@
## 1. EventFetcher Iterator Mode
- [ ] 1.1 Add `fetch_events_iter(container_ids, domain, batch_size)` static method to `EventFetcher` class: yields `Dict[str, List[Dict]]` batches using `read_sql_df_slow_iter`
- [ ] 1.2 Add unit tests for `fetch_events_iter` (mock read_sql_df_slow_iter, verify batch yields)
## 2. NDJSON Stream Endpoint
- [x] 2.1 Add `GET /api/trace/job/<job_id>/stream` endpoint: returns `Content-Type: application/x-ndjson` with Flask `Response(generate(), mimetype='application/x-ndjson')`
- [x] 2.2 Implement NDJSON generator: yield `meta``domain_start``records` batches → `domain_end``aggregation``complete` lines
- [x] 2.3 Add `TRACE_STREAM_BATCH_SIZE` env var (default 5000)
- [x] 2.4 Modify `execute_trace_events_job()` to store results in chunked Redis keys: `trace:job:{job_id}:result:{domain}:{chunk_idx}`
- [x] 2.5 Add unit tests for NDJSON stream endpoint
## 3. Result Pagination API
- [x] 3.1 Enhance `GET /api/trace/job/<job_id>/result` with `domain`, `offset`, `limit` query params
- [x] 3.2 Implement pagination over chunked Redis keys
- [x] 3.3 Add unit tests for pagination (offset/limit boundary cases)
## 4. Frontend Streaming Consumer
- [x] 4.1 Add `consumeNDJSONStream(url, onChunk)` utility using `ReadableStream`
- [x] 4.2 Modify `useTraceProgress.js`: for async jobs, prefer stream endpoint over full result endpoint
- [x] 4.3 Add progressive rendering: update table data as each NDJSON batch arrives
- [x] 4.4 Add error handling: stream interruption, malformed NDJSON lines
## 5. Deployment
- [x] 5.1 Update `.env.example`: add `TRACE_STREAM_BATCH_SIZE` with description
## 6. Verification
- [x] 6.1 Run `python -m pytest tests/ -v` — all existing tests pass
- [x] 6.2 Run `cd frontend && npm run build` — frontend builds successfully
- [ ] 6.3 Manual test: verify NDJSON stream produces valid output for multi-domain query

View File

@@ -23,6 +23,9 @@ openpyxl>=3.0.0 # Excel file support
redis>=5.0.0,<6.0.0
hiredis>=2.0.0,<4.0.0 # C parser for better Redis performance
# Task Queue (async trace jobs)
rq>=1.16.0,<2.0.0
# HTTP Client
requests>=2.28.0,<3.0.0

View File

@@ -242,9 +242,14 @@ show_next_steps() {
echo " sudo chmod 640 .env"
echo " sudo cp deploy/mes-dashboard.service /etc/systemd/system/"
echo " sudo cp deploy/mes-dashboard-watchdog.service /etc/systemd/system/"
echo " sudo cp deploy/mes-dashboard-trace-worker.service /etc/systemd/system/"
echo " sudo systemctl daemon-reload"
echo " sudo systemctl enable --now mes-dashboard mes-dashboard-watchdog"
echo ""
echo "Optional: enable async trace worker (for large queries)"
echo " # Set TRACE_WORKER_ENABLED=true in .env"
echo " sudo systemctl enable --now mes-dashboard-trace-worker"
echo ""
echo "=========================================="
}

View File

@@ -25,6 +25,9 @@ PORT=$(echo "$DEFAULT_PORT" | cut -d: -f2)
REDIS_ENABLED="${REDIS_ENABLED:-true}"
# Worker watchdog configuration
WATCHDOG_ENABLED="${WATCHDOG_ENABLED:-true}"
# RQ trace worker configuration
TRACE_WORKER_ENABLED="${TRACE_WORKER_ENABLED:-false}"
TRACE_WORKER_QUEUE="${TRACE_WORKER_QUEUE:-trace-events}"
# Colors for output
RED='\033[0;31m'
@@ -73,6 +76,8 @@ resolve_runtime_paths() {
WATCHDOG_PID_FILE="${WATCHDOG_PID_FILE:-${WATCHDOG_RUNTIME_DIR}/gunicorn.pid}"
WATCHDOG_STATE_FILE="${WATCHDOG_STATE_FILE:-${WATCHDOG_RUNTIME_DIR}/mes_dashboard_restart_state.json}"
WATCHDOG_PROCESS_PID_FILE="${WATCHDOG_PROCESS_PID_FILE:-${WATCHDOG_RUNTIME_DIR}/worker_watchdog.pid}"
RQ_WORKER_PID_FILE="${WATCHDOG_RUNTIME_DIR}/rq_trace_worker.pid"
RQ_WORKER_LOG="${LOG_DIR}/rq_worker.log"
PID_FILE="${WATCHDOG_PID_FILE}"
export WATCHDOG_RUNTIME_DIR WATCHDOG_RESTART_FLAG WATCHDOG_PID_FILE WATCHDOG_STATE_FILE WATCHDOG_PROCESS_PID_FILE
}
@@ -585,6 +590,126 @@ stop_watchdog() {
return 0
}
# ============================================================
# RQ Trace Worker Management Functions
# ============================================================
get_rq_worker_pid() {
if [ -f "$RQ_WORKER_PID_FILE" ]; then
local pid
pid=$(cat "$RQ_WORKER_PID_FILE" 2>/dev/null || true)
if [ -n "$pid" ] && kill -0 "$pid" 2>/dev/null; then
echo "$pid"
return 0
fi
rm -f "$RQ_WORKER_PID_FILE"
fi
local discovered_pid
discovered_pid=$(pgrep -f "[r]q worker.*${TRACE_WORKER_QUEUE}" 2>/dev/null | head -1 || true)
if [ -n "${discovered_pid}" ] && kill -0 "${discovered_pid}" 2>/dev/null; then
echo "${discovered_pid}" > "$RQ_WORKER_PID_FILE"
echo "${discovered_pid}"
return 0
fi
return 1
}
is_rq_worker_running() {
get_rq_worker_pid &>/dev/null
}
start_rq_worker() {
if ! is_enabled "${TRACE_WORKER_ENABLED:-false}"; then
log_info "RQ trace worker is disabled (TRACE_WORKER_ENABLED=${TRACE_WORKER_ENABLED:-false})"
return 0
fi
if [ "$REDIS_ENABLED" != "true" ]; then
log_warn "Redis is disabled; cannot start RQ trace worker"
return 0
fi
if is_rq_worker_running; then
local pid
pid=$(get_rq_worker_pid)
log_success "RQ trace worker already running (PID: ${pid})"
return 0
fi
# Check if rq is installed
if ! python -c "import rq" 2>/dev/null; then
log_warn "rq package not installed; skip trace worker (pip install rq)"
return 0
fi
log_info "Starting RQ trace worker (queue: ${TRACE_WORKER_QUEUE})..."
local redis_url="${REDIS_URL:-redis://localhost:6379/0}"
if command -v setsid >/dev/null 2>&1; then
setsid rq worker "${TRACE_WORKER_QUEUE}" --url "${redis_url}" >> "$RQ_WORKER_LOG" 2>&1 < /dev/null &
else
nohup rq worker "${TRACE_WORKER_QUEUE}" --url "${redis_url}" >> "$RQ_WORKER_LOG" 2>&1 < /dev/null &
fi
local pid=$!
echo "$pid" > "$RQ_WORKER_PID_FILE"
sleep 1
if kill -0 "$pid" 2>/dev/null; then
log_success "RQ trace worker started (PID: ${pid})"
return 0
fi
rm -f "$RQ_WORKER_PID_FILE"
log_error "Failed to start RQ trace worker"
return 1
}
stop_rq_worker() {
if ! is_rq_worker_running; then
rm -f "$RQ_WORKER_PID_FILE"
return 0
fi
local pid
pid=$(get_rq_worker_pid)
log_info "Stopping RQ trace worker (PID: ${pid})..."
kill -TERM "$pid" 2>/dev/null || true
local count=0
while kill -0 "$pid" 2>/dev/null && [ $count -lt 10 ]; do
sleep 1
count=$((count + 1))
done
if kill -0 "$pid" 2>/dev/null; then
kill -9 "$pid" 2>/dev/null || true
sleep 1
fi
rm -f "$RQ_WORKER_PID_FILE"
if kill -0 "$pid" 2>/dev/null; then
log_error "Failed to stop RQ trace worker"
return 1
fi
log_success "RQ trace worker stopped"
return 0
}
rq_worker_status() {
if ! is_enabled "${TRACE_WORKER_ENABLED:-false}"; then
echo -e " RQ Worker:${YELLOW} DISABLED${NC}"
return 0
fi
if is_rq_worker_running; then
local pid=$(get_rq_worker_pid)
echo -e " RQ Worker:${GREEN} RUNNING${NC} (PID: ${pid}, queue: ${TRACE_WORKER_QUEUE})"
else
echo -e " RQ Worker:${RED} STOPPED${NC}"
fi
}
do_start() {
local foreground=false
@@ -662,6 +787,7 @@ do_start() {
log_info "Access URL: http://localhost:${PORT}"
log_info "Logs: ${LOG_DIR}/"
start_watchdog || return 1
start_rq_worker
echo "[$(timestamp)] Server started (PID: ${pid})" >> "$STARTUP_LOG"
else
log_error "Failed to start server"
@@ -723,6 +849,7 @@ do_stop() {
fi
fi
stop_rq_worker
stop_watchdog
}
@@ -768,6 +895,7 @@ do_status() {
else
echo -e " Watchdog:${YELLOW} DISABLED${NC}"
fi
rq_worker_status
if is_running; then
echo ""

View File

@@ -201,6 +201,10 @@ def get_db_runtime_config(refresh: bool = False) -> Dict[str, Any]:
"DB_SLOW_MAX_CONCURRENT",
config_class.DB_SLOW_MAX_CONCURRENT,
),
"slow_fetchmany_size": _from_app_or_env_int(
"DB_SLOW_FETCHMANY_SIZE",
5000,
),
}
return _DB_RUNTIME_CONFIG.copy()
@@ -696,6 +700,104 @@ def read_sql_df_slow(
sem.release()
def read_sql_df_slow_iter(
sql: str,
params: Optional[Dict[str, Any]] = None,
timeout_seconds: Optional[int] = None,
batch_size: Optional[int] = None,
):
"""Execute a slow SQL query and yield batches of (columns, rows) without DataFrame.
Uses cursor.fetchmany() to avoid full materialization in memory.
Each yielded batch is a tuple of (columns: List[str], rows: List[tuple]).
Args:
sql: SQL query string with Oracle bind variables.
params: Optional dict of parameter values.
timeout_seconds: Call timeout in seconds. None = use config default.
batch_size: Number of rows per fetchmany call. None = use
DB_SLOW_FETCHMANY_SIZE from config (default 5000).
"""
global _SLOW_QUERY_ACTIVE
runtime = get_db_runtime_config()
if timeout_seconds is None:
timeout_ms = runtime["slow_call_timeout_ms"]
else:
timeout_ms = timeout_seconds * 1000
if batch_size is None:
batch_size = runtime["slow_fetchmany_size"]
sem = _get_slow_query_semaphore()
acquired = sem.acquire(timeout=60)
if not acquired:
raise RuntimeError(
"Slow-query concurrency limit reached; try again later"
)
with _SLOW_QUERY_LOCK:
_SLOW_QUERY_ACTIVE += 1
active = _SLOW_QUERY_ACTIVE
logger.info("Slow query iter starting (active=%s, timeout_ms=%s, batch_size=%s)", active, timeout_ms, batch_size)
start_time = time.time()
conn = None
total_rows = 0
try:
conn = oracledb.connect(
**DB_CONFIG,
tcp_connect_timeout=runtime["tcp_connect_timeout"],
retry_count=runtime["retry_count"],
retry_delay=runtime["retry_delay"],
)
conn.call_timeout = timeout_ms
with _DIRECT_CONN_LOCK:
global _DIRECT_CONN_COUNTER
_DIRECT_CONN_COUNTER += 1
logger.debug(
"Slow-query iter connection established (call_timeout_ms=%s)", timeout_ms
)
cursor = conn.cursor()
cursor.execute(sql, params or {})
columns = [desc[0].upper() for desc in cursor.description]
while True:
rows = cursor.fetchmany(batch_size)
if not rows:
break
total_rows += len(rows)
yield columns, rows
cursor.close()
elapsed = time.time() - start_time
if elapsed > 1.0:
sql_preview = sql.strip().replace('\n', ' ')[:100]
logger.warning(f"Slow query iter ({elapsed:.2f}s, rows={total_rows}): {sql_preview}...")
else:
logger.debug(f"Query iter completed in {elapsed:.3f}s, rows={total_rows}")
except Exception as exc:
elapsed = time.time() - start_time
ora_code = _extract_ora_code(exc)
sql_preview = sql.strip().replace('\n', ' ')[:100]
logger.error(
f"Query iter failed after {elapsed:.2f}s - ORA-{ora_code}: {exc} | SQL: {sql_preview}..."
)
raise
finally:
if conn:
try:
conn.close()
except Exception:
pass
with _SLOW_QUERY_LOCK:
_SLOW_QUERY_ACTIVE -= 1
sem.release()
# ============================================================
# Table Utilities
# ============================================================

View File

@@ -18,7 +18,7 @@ import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any, Dict, List, Optional
from flask import Blueprint, jsonify, request
from flask import Blueprint, Response, jsonify, request
from mes_dashboard.core.cache import cache_get, cache_set
from mes_dashboard.core.rate_limit import configured_rate_limit
@@ -31,6 +31,14 @@ from mes_dashboard.services.mid_section_defect_service import (
resolve_trace_seed_lots,
)
from mes_dashboard.services.query_tool_service import resolve_lots
from mes_dashboard.services.trace_job_service import (
TRACE_ASYNC_CID_THRESHOLD,
enqueue_trace_events_job,
get_job_result,
get_job_status,
is_async_available,
stream_job_result_ndjson,
)
logger = logging.getLogger("mes_dashboard.trace_routes")
@@ -38,6 +46,7 @@ trace_bp = Blueprint("trace", __name__, url_prefix="/api/trace")
TRACE_SLOW_THRESHOLD_SECONDS = float(os.getenv('TRACE_SLOW_THRESHOLD_SECONDS', '15'))
TRACE_EVENTS_MAX_WORKERS = int(os.getenv('TRACE_EVENTS_MAX_WORKERS', '2'))
TRACE_EVENTS_CID_LIMIT = int(os.getenv('TRACE_EVENTS_CID_LIMIT', '50000'))
TRACE_CACHE_TTL_SECONDS = 300
PROFILE_QUERY_TOOL = "query_tool"
@@ -87,6 +96,14 @@ _TRACE_EVENTS_RATE_LIMIT = configured_rate_limit(
default_window_seconds=60,
)
_TRACE_JOB_RATE_LIMIT = configured_rate_limit(
bucket="trace-job-status",
max_attempts_env="TRACE_JOB_RATE_LIMIT_MAX_REQUESTS",
window_seconds_env="TRACE_JOB_RATE_LIMIT_WINDOW_SECONDS",
default_max_attempts=60,
default_window_seconds=60,
)
def _json_body() -> Optional[Dict[str, Any]]:
payload = request.get_json(silent=True)
@@ -622,10 +639,49 @@ def events():
400,
)
# Admission control: non-MSD profiles have a hard CID limit to prevent OOM.
# MSD (reject tracing) needs all CIDs for accurate aggregation statistics.
is_msd = (profile == PROFILE_MID_SECTION_DEFECT)
cid_count = len(container_ids)
# Route large queries to async job queue when available.
# Falls through to sync path if RQ is not installed or Redis is down.
if cid_count > TRACE_ASYNC_CID_THRESHOLD and is_async_available():
job_id, err = enqueue_trace_events_job(profile, container_ids, domains, payload)
if job_id is not None:
logger.info(
"trace events routed to async job_id=%s profile=%s cid_count=%s",
job_id, profile, cid_count,
)
return jsonify({
"stage": "events",
"async": True,
"job_id": job_id,
"status_url": f"/api/trace/job/{job_id}",
"stream_url": f"/api/trace/job/{job_id}/stream",
}), 202
logger.warning("trace async enqueue failed cid_count=%s: %s", cid_count, err)
if not is_msd and cid_count > TRACE_EVENTS_CID_LIMIT:
logger.warning(
"trace events CID limit exceeded profile=%s cid_count=%s limit=%s",
profile, cid_count, TRACE_EVENTS_CID_LIMIT,
)
return _error(
"CID_LIMIT_EXCEEDED",
f"container_ids count ({cid_count}) exceeds limit ({TRACE_EVENTS_CID_LIMIT})",
413,
)
if is_msd and cid_count > TRACE_EVENTS_CID_LIMIT:
logger.warning(
"trace events MSD large query proceeding cid_count=%s limit=%s",
cid_count, TRACE_EVENTS_CID_LIMIT,
)
# For MSD profile, skip the events-level cache so that aggregation is
# always recomputed with the current loss_reasons. EventFetcher still
# provides per-domain Redis caching, so raw Oracle queries are avoided.
is_msd = (profile == PROFILE_MID_SECTION_DEFECT)
events_cache_key = _events_cache_key(profile, domains, container_ids)
if not is_msd:
@@ -655,9 +711,11 @@ def events():
domain = futures[future]
try:
events_by_cid = future.result()
raw_domain_results[domain] = events_by_cid
rows = _flatten_domain_records(events_by_cid)
results[domain] = {"data": rows, "count": len(rows)}
if is_msd:
raw_domain_results[domain] = events_by_cid
# Non-MSD: events_by_cid goes out of scope here, no double retention
except Exception as exc:
logger.error("events stage domain failed domain=%s: %s", domain, exc, exc_info=True)
failed_domains.append(domain)
@@ -694,3 +752,62 @@ def events():
gc.collect()
return jsonify(response)
@trace_bp.route("/job/<job_id>", methods=["GET"])
@_TRACE_JOB_RATE_LIMIT
def job_status(job_id: str):
"""Return the current status of an async trace job."""
status = get_job_status(job_id)
if status is None:
return _error("JOB_NOT_FOUND", "job not found or expired", 404)
return jsonify(status)
@trace_bp.route("/job/<job_id>/result", methods=["GET"])
@_TRACE_JOB_RATE_LIMIT
def job_result(job_id: str):
"""Return the result of a completed async trace job."""
status = get_job_status(job_id)
if status is None:
return _error("JOB_NOT_FOUND", "job not found or expired", 404)
if status["status"] not in ("finished",):
return jsonify({
"error": {"code": "JOB_NOT_COMPLETE", "message": "job has not completed yet"},
"status": status["status"],
}), 409
domain = request.args.get("domain")
offset = request.args.get("offset", 0, type=int)
limit = request.args.get("limit", 0, type=int)
result = get_job_result(job_id, domain=domain, offset=offset, limit=limit)
if result is None:
return _error("JOB_RESULT_EXPIRED", "job result has expired", 404)
return jsonify(result)
@trace_bp.route("/job/<job_id>/stream", methods=["GET"])
@_TRACE_JOB_RATE_LIMIT
def job_stream(job_id: str):
"""Stream completed job result as NDJSON for progressive frontend consumption."""
status = get_job_status(job_id)
if status is None:
return _error("JOB_NOT_FOUND", "job not found or expired", 404)
if status["status"] != "finished":
return jsonify({
"error": {"code": "JOB_NOT_COMPLETE", "message": "job has not completed yet"},
"status": status["status"],
}), 409
return Response(
stream_job_result_ndjson(job_id),
mimetype="application/x-ndjson",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
},
)

View File

@@ -13,7 +13,7 @@ from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any, Dict, List
from mes_dashboard.core.cache import cache_get, cache_set
from mes_dashboard.core.database import read_sql_df_slow as read_sql_df
from mes_dashboard.core.database import read_sql_df_slow_iter
from mes_dashboard.sql import QueryBuilder, SQLLoader
logger = logging.getLogger("mes_dashboard.event_fetcher")
@@ -238,28 +238,25 @@ class EventFetcher:
filter_column = spec["filter_column"]
match_mode = spec.get("match_mode", "in")
def _fetch_batch(batch_ids):
def _sanitize_value(v):
"""Replace NaN float values with None for JSON-safe serialization."""
if isinstance(v, float) and math.isnan(v):
return None
return v
def _fetch_and_group_batch(batch_ids):
"""Fetch a batch using fetchmany iterator and group into ``grouped``."""
builder = QueryBuilder()
if match_mode == "contains":
builder.add_or_like_conditions(filter_column, batch_ids, position="both")
else:
builder.add_in_condition(filter_column, batch_ids)
sql = EventFetcher._build_domain_sql(domain, builder.get_conditions_sql())
return batch_ids, read_sql_df(sql, builder.params, timeout_seconds=60)
def _sanitize_record(d):
"""Replace NaN/NaT values with None for JSON-safe serialization."""
for k, v in d.items():
if isinstance(v, float) and math.isnan(v):
d[k] = None
return d
def _process_batch_result(batch_ids, df):
if df is None or df.empty:
return
for _, row in df.iterrows():
for columns, rows in read_sql_df_slow_iter(sql, builder.params, timeout_seconds=60):
for row in rows:
record = {k: _sanitize_value(v) for k, v in zip(columns, row)}
if domain == "jobs":
record = _sanitize_record(row.to_dict())
containers = record.get("CONTAINERIDS")
if not isinstance(containers, str) or not containers:
continue
@@ -269,10 +266,10 @@ class EventFetcher:
enriched["CONTAINERID"] = cid
grouped[cid].append(enriched)
continue
cid = row.get("CONTAINERID")
cid = record.get("CONTAINERID")
if not isinstance(cid, str) or not cid:
continue
grouped[cid].append(_sanitize_record(row.to_dict()))
grouped[cid].append(record)
batches = [
normalized_ids[i:i + ORACLE_IN_BATCH_SIZE]
@@ -281,15 +278,13 @@ class EventFetcher:
if len(batches) <= 1:
for batch in batches:
batch_ids, df = _fetch_batch(batch)
_process_batch_result(batch_ids, df)
_fetch_and_group_batch(batch)
else:
with ThreadPoolExecutor(max_workers=min(len(batches), EVENT_FETCHER_MAX_WORKERS)) as executor:
futures = {executor.submit(_fetch_batch, b): b for b in batches}
futures = {executor.submit(_fetch_and_group_batch, b): b for b in batches}
for future in as_completed(futures):
try:
batch_ids, df = future.result()
_process_batch_result(batch_ids, df)
future.result()
except Exception:
logger.error(
"EventFetcher batch query failed domain=%s batch_size=%s",

View File

@@ -0,0 +1,616 @@
# -*- coding: utf-8 -*-
"""Async trace job service using RQ (Redis Queue).
Provides enqueue/status/result functions for long-running trace events queries.
The worker entry point ``execute_trace_events_job`` runs in a separate RQ worker
process — independent of gunicorn — with its own memory space.
"""
from __future__ import annotations
import gc
import json
import logging
import os
import time
import uuid
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any, Dict, List, Optional, Tuple
from mes_dashboard.core.redis_client import get_key, get_redis_client
logger = logging.getLogger("mes_dashboard.trace_job_service")
# ---------------------------------------------------------------------------
# Configuration from environment
# ---------------------------------------------------------------------------
TRACE_ASYNC_CID_THRESHOLD = int(os.getenv("TRACE_ASYNC_CID_THRESHOLD", "20000"))
TRACE_JOB_TTL_SECONDS = int(os.getenv("TRACE_JOB_TTL_SECONDS", "3600"))
TRACE_JOB_TIMEOUT_SECONDS = int(os.getenv("TRACE_JOB_TIMEOUT_SECONDS", "1800"))
TRACE_WORKER_QUEUE = os.getenv("TRACE_WORKER_QUEUE", "trace-events")
TRACE_EVENTS_MAX_WORKERS = int(os.getenv("TRACE_EVENTS_MAX_WORKERS", "2"))
TRACE_STREAM_BATCH_SIZE = int(os.getenv("TRACE_STREAM_BATCH_SIZE", "5000"))
# ---------------------------------------------------------------------------
# RQ queue accessor
# ---------------------------------------------------------------------------
_RQ_AVAILABLE: Optional[bool] = None
def _check_rq_available() -> bool:
global _RQ_AVAILABLE
if _RQ_AVAILABLE is None:
try:
import rq # noqa: F401
_RQ_AVAILABLE = True
except ImportError:
_RQ_AVAILABLE = False
return _RQ_AVAILABLE
def is_async_available() -> bool:
"""Return True if RQ is installed and Redis is reachable."""
if not _check_rq_available():
return False
conn = get_redis_client()
return conn is not None
def _get_rq_queue():
"""Get RQ queue instance. Returns None if unavailable."""
if not _check_rq_available():
return None
conn = get_redis_client()
if conn is None:
return None
from rq import Queue
return Queue(TRACE_WORKER_QUEUE, connection=conn)
# ---------------------------------------------------------------------------
# Redis key helpers
# ---------------------------------------------------------------------------
def _meta_key(job_id: str) -> str:
return get_key(f"trace:job:{job_id}:meta")
def _result_key(job_id: str) -> str:
return get_key(f"trace:job:{job_id}:result")
def _result_meta_key(job_id: str) -> str:
return get_key(f"trace:job:{job_id}:result:meta")
def _result_chunk_key(job_id: str, domain: str, chunk_idx: int) -> str:
return get_key(f"trace:job:{job_id}:result:{domain}:{chunk_idx}")
def _result_aggregation_key(job_id: str) -> str:
return get_key(f"trace:job:{job_id}:result:aggregation")
# ---------------------------------------------------------------------------
# Public API: enqueue / status / result
# ---------------------------------------------------------------------------
def enqueue_trace_events_job(
profile: str,
container_ids: List[str],
domains: List[str],
payload: Dict[str, Any],
) -> Tuple[Optional[str], Optional[str]]:
"""Enqueue an async trace events job.
Returns:
(job_id, None) on success, (None, error_message) on failure.
"""
queue = _get_rq_queue()
if queue is None:
return None, "async job queue unavailable (Redis or RQ not installed)"
job_id = f"trace-evt-{uuid.uuid4().hex[:12]}"
conn = get_redis_client()
meta = {
"profile": profile,
"cid_count": str(len(container_ids)),
"domains": ",".join(domains),
"status": "queued",
"progress": "",
"created_at": str(time.time()),
"completed_at": "",
"error": "",
}
conn.hset(_meta_key(job_id), mapping=meta)
conn.expire(_meta_key(job_id), TRACE_JOB_TTL_SECONDS)
try:
queue.enqueue(
execute_trace_events_job,
job_id,
profile,
container_ids,
domains,
payload,
job_id=job_id,
job_timeout=TRACE_JOB_TIMEOUT_SECONDS,
result_ttl=TRACE_JOB_TTL_SECONDS,
failure_ttl=TRACE_JOB_TTL_SECONDS,
)
except Exception as exc:
logger.error("Failed to enqueue trace job: %s", exc, exc_info=True)
conn.delete(_meta_key(job_id))
return None, f"enqueue failed: {exc}"
logger.info(
"trace job enqueued job_id=%s profile=%s cid_count=%s domains=%s",
job_id, profile, len(container_ids), ",".join(domains),
)
return job_id, None
def get_job_status(job_id: str) -> Optional[Dict[str, Any]]:
"""Get trace job status from Redis metadata. Returns None if not found."""
conn = get_redis_client()
if conn is None:
return None
meta = conn.hgetall(_meta_key(job_id))
if not meta:
return None
created_at = float(meta.get("created_at", 0))
elapsed = time.time() - created_at if created_at > 0 else 0
return {
"job_id": job_id,
"status": meta.get("status", "unknown"),
"profile": meta.get("profile"),
"cid_count": int(meta.get("cid_count", 0)),
"domains": meta.get("domains", "").split(",") if meta.get("domains") else [],
"progress": meta.get("progress", ""),
"created_at": created_at,
"elapsed_seconds": round(elapsed, 1),
"error": meta.get("error") or None,
}
def get_job_result(
job_id: str,
domain: Optional[str] = None,
offset: int = 0,
limit: int = 0,
) -> Optional[Dict[str, Any]]:
"""Get completed job result from Redis.
Supports chunked storage (new) and legacy single-key storage.
Supports optional domain filtering and pagination via offset/limit.
"""
conn = get_redis_client()
if conn is None:
return None
# Try chunked storage first
raw_meta = conn.get(_result_meta_key(job_id))
if raw_meta is not None:
return _get_chunked_result(conn, job_id, raw_meta, domain, offset, limit)
# Fall back to legacy single-key storage
raw = conn.get(_result_key(job_id))
if raw is None:
return None
result = json.loads(raw)
if domain and "results" in result:
domain_data = result["results"].get(domain)
if domain_data is None:
return result
rows = domain_data.get("data", [])
if offset > 0:
rows = rows[offset:]
if limit > 0:
rows = rows[:limit]
result["results"] = {
domain: {"data": rows, "count": len(rows), "total": domain_data.get("count", 0)},
}
return result
def _get_chunked_result(
conn,
job_id: str,
raw_meta: str,
domain: Optional[str] = None,
offset: int = 0,
limit: int = 0,
) -> Dict[str, Any]:
"""Reconstruct result from chunked Redis keys."""
meta = json.loads(raw_meta)
domain_info = meta.get("domains", {})
results: Dict[str, Any] = {}
target_domains = [domain] if domain else list(domain_info.keys())
for d in target_domains:
info = domain_info.get(d)
if info is None:
continue
num_chunks = info.get("chunks", 0)
total = info.get("total", 0)
rows: List[Dict[str, Any]] = []
for i in range(num_chunks):
raw_chunk = conn.get(_result_chunk_key(job_id, d, i))
if raw_chunk is not None:
rows.extend(json.loads(raw_chunk))
if offset > 0:
rows = rows[offset:]
if limit > 0:
rows = rows[:limit]
results[d] = {"data": rows, "count": len(rows), "total": total}
aggregation = None
raw_agg = conn.get(_result_aggregation_key(job_id))
if raw_agg is not None:
aggregation = json.loads(raw_agg)
result: Dict[str, Any] = {
"stage": "events",
"results": results,
"aggregation": aggregation,
}
if meta.get("failed_domains"):
result["error"] = "one or more domains failed"
result["code"] = "EVENTS_PARTIAL_FAILURE"
result["failed_domains"] = meta["failed_domains"]
return result
# ---------------------------------------------------------------------------
# NDJSON streaming generator
# ---------------------------------------------------------------------------
def stream_job_result_ndjson(job_id: str):
"""Generator yielding NDJSON lines for a completed job's chunked result.
Each line is a JSON object followed by ``\\n``. The protocol:
1. ``{"type":"meta", ...}``
2. For each domain:
- ``{"type":"domain_start", "domain":"...", "total":N}``
- ``{"type":"records", "domain":"...", "batch":i, "count":N, "data":[...]}``
- ``{"type":"domain_end", "domain":"...", "count":N}``
3. ``{"type":"aggregation", "data":{...}}`` (if present)
4. ``{"type":"complete", "total_records":N}``
"""
conn = get_redis_client()
if conn is None:
yield _ndjson_line({"type": "error", "message": "Redis unavailable"})
return
raw_meta = conn.get(_result_meta_key(job_id))
if raw_meta is None:
# Fall back: legacy single-key result → emit as single NDJSON blob
raw = conn.get(_result_key(job_id))
if raw is None:
yield _ndjson_line({"type": "error", "message": "result not found"})
return
result = json.loads(raw)
yield _ndjson_line({"type": "full_result", "data": result})
return
meta = json.loads(raw_meta)
domain_info = meta.get("domains", {})
yield _ndjson_line({
"type": "meta",
"job_id": job_id,
"profile": meta.get("profile"),
"domains": list(domain_info.keys()),
})
total_records = 0
for domain_name, info in domain_info.items():
num_chunks = info.get("chunks", 0)
domain_total = info.get("total", 0)
yield _ndjson_line({
"type": "domain_start",
"domain": domain_name,
"total": domain_total,
})
domain_count = 0
for i in range(num_chunks):
raw_chunk = conn.get(_result_chunk_key(job_id, domain_name, i))
if raw_chunk is None:
continue
rows = json.loads(raw_chunk)
domain_count += len(rows)
yield _ndjson_line({
"type": "records",
"domain": domain_name,
"batch": i,
"count": len(rows),
"data": rows,
})
yield _ndjson_line({
"type": "domain_end",
"domain": domain_name,
"count": domain_count,
})
total_records += domain_count
raw_agg = conn.get(_result_aggregation_key(job_id))
if raw_agg is not None:
yield _ndjson_line({
"type": "aggregation",
"data": json.loads(raw_agg),
})
if meta.get("failed_domains"):
yield _ndjson_line({
"type": "warning",
"code": "EVENTS_PARTIAL_FAILURE",
"failed_domains": meta["failed_domains"],
})
yield _ndjson_line({
"type": "complete",
"total_records": total_records,
})
def _ndjson_line(obj: Dict[str, Any]) -> str:
return json.dumps(obj, default=str, ensure_ascii=False) + "\n"
# ---------------------------------------------------------------------------
# Chunked result storage
# ---------------------------------------------------------------------------
def _store_chunked_result(
conn,
job_id: str,
profile: str,
results: Dict[str, Dict[str, Any]],
aggregation: Optional[Dict[str, Any]],
failed_domains: List[str],
) -> None:
"""Store job result as chunked Redis keys for streaming retrieval."""
domain_info: Dict[str, Dict[str, Any]] = {}
for domain_name, domain_data in results.items():
rows = domain_data.get("data", [])
total = len(rows)
chunks = [
rows[i:i + TRACE_STREAM_BATCH_SIZE]
for i in range(0, max(len(rows), 1), TRACE_STREAM_BATCH_SIZE)
] if rows else []
for idx, chunk in enumerate(chunks):
conn.setex(
_result_chunk_key(job_id, domain_name, idx),
TRACE_JOB_TTL_SECONDS,
json.dumps(chunk, default=str, ensure_ascii=False),
)
domain_info[domain_name] = {"chunks": len(chunks), "total": total}
if aggregation is not None:
conn.setex(
_result_aggregation_key(job_id),
TRACE_JOB_TTL_SECONDS,
json.dumps(aggregation, default=str, ensure_ascii=False),
)
result_meta = {
"profile": profile,
"domains": domain_info,
"failed_domains": sorted(failed_domains) if failed_domains else [],
}
conn.setex(
_result_meta_key(job_id),
TRACE_JOB_TTL_SECONDS,
json.dumps(result_meta, default=str, ensure_ascii=False),
)
# ---------------------------------------------------------------------------
# Worker entry point (runs in RQ worker process)
# ---------------------------------------------------------------------------
def _update_meta(job_id: str, **fields) -> None:
"""Update job metadata fields in Redis."""
conn = get_redis_client()
if conn is None:
return
key = _meta_key(job_id)
conn.hset(key, mapping={k: str(v) for k, v in fields.items()})
def execute_trace_events_job(
job_id: str,
profile: str,
container_ids: List[str],
domains: List[str],
payload: Dict[str, Any],
) -> None:
"""RQ worker entry point: execute trace events and store result in Redis.
This function runs in a dedicated RQ worker process — outside gunicorn —
so it does not compete for gunicorn worker threads or the DB connection pool.
"""
from mes_dashboard.services.event_fetcher import EventFetcher
logger.info(
"trace job started job_id=%s profile=%s cid_count=%s domains=%s",
job_id, profile, len(container_ids), ",".join(domains),
)
_update_meta(job_id, status="started", progress="fetching events")
try:
results: Dict[str, Dict[str, Any]] = {}
raw_domain_results: Dict[str, Dict[str, List[Dict[str, Any]]]] = {}
failed_domains: List[str] = []
is_msd = (profile == "mid_section_defect")
with ThreadPoolExecutor(
max_workers=min(len(domains), TRACE_EVENTS_MAX_WORKERS),
) as executor:
futures = {
executor.submit(EventFetcher.fetch_events, container_ids, domain): domain
for domain in domains
}
for future in as_completed(futures):
domain = futures[future]
try:
events_by_cid = future.result()
rows = _flatten_domain_records(events_by_cid)
results[domain] = {"data": rows, "count": len(rows)}
if is_msd:
raw_domain_results[domain] = events_by_cid
except Exception as exc:
logger.error(
"trace job domain failed job_id=%s domain=%s: %s",
job_id, domain, exc, exc_info=True,
)
failed_domains.append(domain)
_update_meta(job_id, progress="building response")
aggregation = None
if is_msd:
aggregation, agg_error = _build_job_msd_aggregation(payload, raw_domain_results)
del raw_domain_results
if agg_error is not None:
raise RuntimeError(agg_error)
else:
del raw_domain_results
# Store result in Redis as chunked keys for streaming retrieval
conn = get_redis_client()
if conn is not None:
_store_chunked_result(
conn, job_id, profile, results, aggregation, failed_domains,
)
_update_meta(
job_id,
status="finished",
progress="complete",
completed_at=time.time(),
)
logger.info(
"trace job completed job_id=%s profile=%s domains=%s",
job_id, profile, ",".join(domains),
)
if len(container_ids) > 10000:
gc.collect()
except Exception as exc:
logger.error(
"trace job failed job_id=%s: %s", job_id, exc, exc_info=True,
)
_update_meta(
job_id,
status="failed",
error=str(exc),
completed_at=time.time(),
)
raise
# ---------------------------------------------------------------------------
# Helpers (duplicated from trace_routes to avoid Flask dependency in worker)
# ---------------------------------------------------------------------------
def _flatten_domain_records(
events_by_cid: Dict[str, List[Dict[str, Any]]],
) -> List[Dict[str, Any]]:
rows: List[Dict[str, Any]] = []
for records in events_by_cid.values():
if not isinstance(records, list):
continue
for row in records:
if isinstance(row, dict):
rows.append(row)
return rows
def _build_job_msd_aggregation(
payload: Dict[str, Any],
domain_results: Dict[str, Dict[str, List[Dict[str, Any]]]],
) -> Tuple[Optional[Dict[str, Any]], Optional[str]]:
"""Build MSD aggregation inside worker process (no Flask context)."""
from mes_dashboard.services.mid_section_defect_service import (
build_trace_aggregation_from_events,
parse_loss_reasons_param,
)
params = payload.get("params")
if not isinstance(params, dict):
return None, "params is required for mid_section_defect profile"
mode = str(params.get("mode") or "date_range").strip()
start_date = None
end_date = None
if mode != "container":
date_range = params.get("date_range")
if isinstance(date_range, list) and len(date_range) == 2:
start_date = str(date_range[0] or "").strip()
end_date = str(date_range[1] or "").strip()
if not start_date or not end_date:
start_date = str(params.get("start_date") or "").strip()
end_date = str(params.get("end_date") or "").strip()
if not start_date or not end_date:
return None, "start_date/end_date is required in params"
raw_loss_reasons = params.get("loss_reasons")
loss_reasons = parse_loss_reasons_param(raw_loss_reasons)
lineage = payload.get("lineage") or {}
lineage_ancestors = lineage.get("ancestors") if isinstance(lineage, dict) else None
lineage_roots = lineage.get("seed_roots") if isinstance(lineage, dict) else None
seed_container_ids = payload.get("seed_container_ids", [])
if not seed_container_ids and isinstance(lineage_ancestors, dict):
seed_container_ids = list(lineage_ancestors.keys())
seed_container_ids = [
s.strip() for s in seed_container_ids
if isinstance(s, str) and s.strip()
]
upstream_events = domain_results.get("upstream_history", {})
materials_events = domain_results.get("materials", {})
downstream_events = domain_results.get("downstream_rejects", {})
station = str(params.get("station") or "測試").strip()
direction = str(params.get("direction") or "backward").strip()
aggregation = build_trace_aggregation_from_events(
start_date,
end_date,
loss_reasons=loss_reasons,
seed_container_ids=seed_container_ids,
lineage_ancestors=lineage_ancestors,
lineage_roots=lineage_roots,
upstream_events_by_cid=upstream_events,
materials_events_by_cid=materials_events,
downstream_events_by_cid=downstream_events,
station=station,
direction=direction,
mode=mode,
)
if aggregation is None:
return None, "aggregation service unavailable"
if "error" in aggregation:
return None, str(aggregation["error"])
return aggregation, None

View File

@@ -0,0 +1,119 @@
# -*- coding: utf-8 -*-
"""Unit tests for read_sql_df_slow_iter (fetchmany iterator)."""
from __future__ import annotations
from unittest.mock import MagicMock, patch
import mes_dashboard.core.database as db
@patch.object(db, "oracledb")
@patch.object(db, "_get_slow_query_semaphore")
@patch.object(db, "get_db_runtime_config")
def test_slow_iter_yields_batches(mock_runtime, mock_sem_fn, mock_oracledb):
"""read_sql_df_slow_iter should yield (columns, rows) batches via fetchmany."""
mock_runtime.return_value = {
"slow_call_timeout_ms": 60000,
"slow_fetchmany_size": 2,
"tcp_connect_timeout": 10,
"retry_count": 1,
"retry_delay": 1.0,
}
sem = MagicMock()
sem.acquire.return_value = True
mock_sem_fn.return_value = sem
cursor = MagicMock()
cursor.description = [("COL_A",), ("COL_B",)]
cursor.fetchmany.side_effect = [
[("r1a", "r1b"), ("r2a", "r2b")],
[("r3a", "r3b")],
[],
]
conn = MagicMock()
conn.cursor.return_value = cursor
mock_oracledb.connect.return_value = conn
batches = list(db.read_sql_df_slow_iter("SELECT 1", {"p0": "x"}, batch_size=2))
assert len(batches) == 2
assert batches[0] == (["COL_A", "COL_B"], [("r1a", "r1b"), ("r2a", "r2b")])
assert batches[1] == (["COL_A", "COL_B"], [("r3a", "r3b")])
cursor.fetchmany.assert_called_with(2)
conn.close.assert_called_once()
sem.release.assert_called_once()
@patch.object(db, "oracledb")
@patch.object(db, "_get_slow_query_semaphore")
@patch.object(db, "get_db_runtime_config")
def test_slow_iter_empty_result(mock_runtime, mock_sem_fn, mock_oracledb):
"""read_sql_df_slow_iter should yield nothing for empty result."""
mock_runtime.return_value = {
"slow_call_timeout_ms": 60000,
"slow_fetchmany_size": 5000,
"tcp_connect_timeout": 10,
"retry_count": 1,
"retry_delay": 1.0,
}
sem = MagicMock()
sem.acquire.return_value = True
mock_sem_fn.return_value = sem
cursor = MagicMock()
cursor.description = [("ID",)]
cursor.fetchmany.return_value = []
conn = MagicMock()
conn.cursor.return_value = cursor
mock_oracledb.connect.return_value = conn
batches = list(db.read_sql_df_slow_iter("SELECT 1"))
assert batches == []
conn.close.assert_called_once()
sem.release.assert_called_once()
@patch.object(db, "oracledb")
@patch.object(db, "_get_slow_query_semaphore")
@patch.object(db, "get_db_runtime_config")
def test_slow_iter_releases_on_error(mock_runtime, mock_sem_fn, mock_oracledb):
"""Semaphore and connection should be released even on error."""
mock_runtime.return_value = {
"slow_call_timeout_ms": 60000,
"slow_fetchmany_size": 5000,
"tcp_connect_timeout": 10,
"retry_count": 1,
"retry_delay": 1.0,
}
sem = MagicMock()
sem.acquire.return_value = True
mock_sem_fn.return_value = sem
conn = MagicMock()
conn.cursor.side_effect = RuntimeError("cursor failed")
mock_oracledb.connect.return_value = conn
try:
list(db.read_sql_df_slow_iter("SELECT 1"))
except RuntimeError:
pass
conn.close.assert_called_once()
sem.release.assert_called_once()
def test_runtime_config_includes_fetchmany_size():
"""get_db_runtime_config should include slow_fetchmany_size."""
# Force refresh to pick up current config
db._DB_RUNTIME_CONFIG = None
runtime = db.get_db_runtime_config(refresh=True)
assert "slow_fetchmany_size" in runtime
assert isinstance(runtime["slow_fetchmany_size"], int)
assert runtime["slow_fetchmany_size"] > 0

View File

@@ -5,11 +5,21 @@ from __future__ import annotations
from unittest.mock import patch
import pandas as pd
from mes_dashboard.services.event_fetcher import EventFetcher
def _iter_result(columns, rows):
"""Helper: create a generator that yields a single (columns, rows) batch."""
def _gen(*args, **kwargs):
yield columns, rows
return _gen
def _iter_empty(*args, **kwargs):
"""Helper: generator that yields nothing (empty result)."""
return iter([])
def test_cache_key_is_stable_for_sorted_ids():
key1 = EventFetcher._cache_key("history", ["CID-B", "CID-A", "CID-A"])
key2 = EventFetcher._cache_key("history", ["CID-A", "CID-B"])
@@ -29,98 +39,96 @@ def test_get_rate_limit_config_supports_env_override(monkeypatch):
assert config["window_seconds"] == 77
@patch("mes_dashboard.services.event_fetcher.read_sql_df")
@patch("mes_dashboard.services.event_fetcher.read_sql_df_slow_iter")
@patch("mes_dashboard.services.event_fetcher.cache_get")
def test_fetch_events_cache_hit_skips_db(mock_cache_get, mock_read_sql_df):
def test_fetch_events_cache_hit_skips_db(mock_cache_get, mock_iter):
mock_cache_get.return_value = {"CID-1": [{"CONTAINERID": "CID-1"}]}
result = EventFetcher.fetch_events(["CID-1"], "materials")
assert result["CID-1"][0]["CONTAINERID"] == "CID-1"
mock_read_sql_df.assert_not_called()
mock_iter.assert_not_called()
@patch("mes_dashboard.services.event_fetcher.cache_set")
@patch("mes_dashboard.services.event_fetcher.cache_get", return_value=None)
@patch("mes_dashboard.services.event_fetcher.read_sql_df")
@patch("mes_dashboard.services.event_fetcher.read_sql_df_slow_iter")
@patch("mes_dashboard.services.event_fetcher.SQLLoader.load_with_params")
def test_fetch_events_upstream_history_branch(
mock_sql_load,
mock_read_sql_df,
mock_iter,
_mock_cache_get,
mock_cache_set,
):
mock_sql_load.return_value = "SELECT * FROM UPSTREAM"
mock_read_sql_df.return_value = pd.DataFrame(
[
{"CONTAINERID": "CID-1", "WORKCENTER_GROUP": "DB"},
{"CONTAINERID": "CID-2", "WORKCENTER_GROUP": "WB"},
]
mock_iter.side_effect = _iter_result(
["CONTAINERID", "WORKCENTER_GROUP"],
[("CID-1", "DB"), ("CID-2", "WB")],
)
result = EventFetcher.fetch_events(["CID-1", "CID-2"], "upstream_history")
assert sorted(result.keys()) == ["CID-1", "CID-2"]
assert mock_sql_load.call_args.args[0] == "mid_section_defect/upstream_history"
_, params = mock_read_sql_df.call_args.args
assert len(params) == 2
sql_arg, params_arg = mock_iter.call_args.args
assert len(params_arg) == 2
mock_cache_set.assert_called_once()
assert mock_cache_set.call_args.args[0].startswith("evt:upstream_history:")
@patch("mes_dashboard.services.event_fetcher.cache_set")
@patch("mes_dashboard.services.event_fetcher.cache_get", return_value=None)
@patch("mes_dashboard.services.event_fetcher.read_sql_df")
@patch("mes_dashboard.services.event_fetcher.read_sql_df_slow_iter")
@patch("mes_dashboard.services.event_fetcher.SQLLoader.load")
def test_fetch_events_history_branch_replaces_container_filter(
mock_sql_load,
mock_read_sql_df,
mock_iter,
_mock_cache_get,
_mock_cache_set,
):
mock_sql_load.return_value = (
"SELECT * FROM t WHERE h.CONTAINERID = :container_id {{ WORKCENTER_FILTER }}"
)
mock_read_sql_df.return_value = pd.DataFrame([])
mock_iter.side_effect = _iter_empty
EventFetcher.fetch_events(["CID-1"], "history")
sql, params = mock_read_sql_df.call_args.args
assert "h.CONTAINERID = :container_id" not in sql
assert "{{ WORKCENTER_FILTER }}" not in sql
assert params == {"p0": "CID-1"}
sql_arg, params_arg = mock_iter.call_args.args
assert "h.CONTAINERID = :container_id" not in sql_arg
assert "{{ WORKCENTER_FILTER }}" not in sql_arg
assert params_arg == {"p0": "CID-1"}
@patch("mes_dashboard.services.event_fetcher.cache_set")
@patch("mes_dashboard.services.event_fetcher.cache_get", return_value=None)
@patch("mes_dashboard.services.event_fetcher.read_sql_df")
@patch("mes_dashboard.services.event_fetcher.read_sql_df_slow_iter")
@patch("mes_dashboard.services.event_fetcher.SQLLoader.load")
def test_fetch_events_materials_branch_replaces_aliased_container_filter(
mock_sql_load,
mock_read_sql_df,
mock_iter,
_mock_cache_get,
_mock_cache_set,
):
mock_sql_load.return_value = (
"SELECT * FROM t m WHERE m.CONTAINERID = :container_id ORDER BY TXNDATE"
)
mock_read_sql_df.return_value = pd.DataFrame([])
mock_iter.side_effect = _iter_empty
EventFetcher.fetch_events(["CID-1", "CID-2"], "materials")
sql, params = mock_read_sql_df.call_args.args
assert "m.CONTAINERID = :container_id" not in sql
assert "IN" in sql.upper()
assert params == {"p0": "CID-1", "p1": "CID-2"}
sql_arg, params_arg = mock_iter.call_args.args
assert "m.CONTAINERID = :container_id" not in sql_arg
assert "IN" in sql_arg.upper()
assert params_arg == {"p0": "CID-1", "p1": "CID-2"}
@patch("mes_dashboard.services.event_fetcher.cache_set")
@patch("mes_dashboard.services.event_fetcher.cache_get", return_value=None)
@patch("mes_dashboard.services.event_fetcher.read_sql_df")
@patch("mes_dashboard.services.event_fetcher.read_sql_df_slow_iter")
@patch("mes_dashboard.services.event_fetcher.SQLLoader.load")
def test_fetch_events_rejects_branch_replaces_aliased_container_filter(
mock_sql_load,
mock_read_sql_df,
mock_iter,
_mock_cache_get,
_mock_cache_set,
):
@@ -128,23 +136,23 @@ def test_fetch_events_rejects_branch_replaces_aliased_container_filter(
"SELECT * FROM t r LEFT JOIN c ON c.CONTAINERID = r.CONTAINERID "
"WHERE r.CONTAINERID = :container_id ORDER BY r.TXNDATE"
)
mock_read_sql_df.return_value = pd.DataFrame([])
mock_iter.side_effect = _iter_empty
EventFetcher.fetch_events(["CID-1", "CID-2"], "rejects")
sql, params = mock_read_sql_df.call_args.args
assert "r.CONTAINERID = :container_id" not in sql
assert "IN" in sql.upper()
assert params == {"p0": "CID-1", "p1": "CID-2"}
sql_arg, params_arg = mock_iter.call_args.args
assert "r.CONTAINERID = :container_id" not in sql_arg
assert "IN" in sql_arg.upper()
assert params_arg == {"p0": "CID-1", "p1": "CID-2"}
@patch("mes_dashboard.services.event_fetcher.cache_set")
@patch("mes_dashboard.services.event_fetcher.cache_get", return_value=None)
@patch("mes_dashboard.services.event_fetcher.read_sql_df")
@patch("mes_dashboard.services.event_fetcher.read_sql_df_slow_iter")
@patch("mes_dashboard.services.event_fetcher.SQLLoader.load")
def test_fetch_events_holds_branch_replaces_aliased_container_filter(
mock_sql_load,
mock_read_sql_df,
mock_iter,
_mock_cache_get,
_mock_cache_set,
):
@@ -152,19 +160,41 @@ def test_fetch_events_holds_branch_replaces_aliased_container_filter(
"SELECT * FROM t h LEFT JOIN c ON c.CONTAINERID = h.CONTAINERID "
"WHERE h.CONTAINERID = :container_id ORDER BY h.HOLDTXNDATE DESC"
)
mock_read_sql_df.return_value = pd.DataFrame([])
mock_iter.side_effect = _iter_empty
EventFetcher.fetch_events(["CID-1", "CID-2"], "holds")
sql, params = mock_read_sql_df.call_args.args
assert "h.CONTAINERID = :container_id" not in sql
assert "IN" in sql.upper()
assert params == {"p0": "CID-1", "p1": "CID-2"}
sql_arg, params_arg = mock_iter.call_args.args
assert "h.CONTAINERID = :container_id" not in sql_arg
assert "IN" in sql_arg.upper()
assert params_arg == {"p0": "CID-1", "p1": "CID-2"}
def test_event_fetcher_uses_slow_connection():
"""Regression: event_fetcher must use read_sql_df_slow (non-pooled)."""
def test_event_fetcher_uses_slow_iter_connection():
"""Regression: event_fetcher must use read_sql_df_slow_iter (non-pooled)."""
import mes_dashboard.services.event_fetcher as ef
from mes_dashboard.core.database import read_sql_df_slow
from mes_dashboard.core.database import read_sql_df_slow_iter
assert ef.read_sql_df is read_sql_df_slow
assert ef.read_sql_df_slow_iter is read_sql_df_slow_iter
@patch("mes_dashboard.services.event_fetcher.cache_set")
@patch("mes_dashboard.services.event_fetcher.cache_get", return_value=None)
@patch("mes_dashboard.services.event_fetcher.read_sql_df_slow_iter")
@patch("mes_dashboard.services.event_fetcher.SQLLoader.load_with_params")
def test_fetch_events_sanitizes_nan_values(
mock_sql_load,
mock_iter,
_mock_cache_get,
_mock_cache_set,
):
"""NaN float values in records should be replaced with None."""
mock_sql_load.return_value = "SELECT * FROM UPSTREAM"
mock_iter.side_effect = _iter_result(
["CONTAINERID", "VALUE"],
[("CID-1", float("nan"))],
)
result = EventFetcher.fetch_events(["CID-1"], "upstream_history")
assert result["CID-1"][0]["VALUE"] is None

View File

@@ -0,0 +1,507 @@
# -*- coding: utf-8 -*-
"""Unit tests for trace_job_service (async trace job queue)."""
from __future__ import annotations
import json
from unittest.mock import MagicMock, patch
import pytest
import mes_dashboard.services.trace_job_service as tjs
# ---------------------------------------------------------------------------
# is_async_available
# ---------------------------------------------------------------------------
def test_is_async_available_true():
"""Should return True when rq is importable and Redis is up."""
tjs._RQ_AVAILABLE = None # reset cached flag
with patch.object(tjs, "get_redis_client", return_value=MagicMock()):
assert tjs.is_async_available() is True
def test_is_async_available_false_no_redis():
"""Should return False when Redis is unavailable."""
tjs._RQ_AVAILABLE = True
with patch.object(tjs, "get_redis_client", return_value=None):
assert tjs.is_async_available() is False
# ---------------------------------------------------------------------------
# enqueue_trace_events_job
# ---------------------------------------------------------------------------
@patch.object(tjs, "_get_rq_queue")
@patch.object(tjs, "get_redis_client")
def test_enqueue_success(mock_redis, mock_queue_fn):
"""Enqueue should return a job_id and store metadata in Redis."""
conn = MagicMock()
mock_redis.return_value = conn
queue = MagicMock()
mock_queue_fn.return_value = queue
job_id, err = tjs.enqueue_trace_events_job(
"query_tool", ["CID-1", "CID-2"], ["history"], {"params": {}},
)
assert job_id is not None
assert job_id.startswith("trace-evt-")
assert err is None
queue.enqueue.assert_called_once()
conn.hset.assert_called_once()
conn.expire.assert_called_once()
@patch.object(tjs, "_get_rq_queue", return_value=None)
def test_enqueue_no_queue(mock_queue_fn):
"""Enqueue should return error when queue is unavailable."""
job_id, err = tjs.enqueue_trace_events_job(
"query_tool", ["CID-1"], ["history"], {},
)
assert job_id is None
assert "unavailable" in err
@patch.object(tjs, "_get_rq_queue")
@patch.object(tjs, "get_redis_client")
def test_enqueue_queue_error(mock_redis, mock_queue_fn):
"""Enqueue should return error when queue.enqueue raises."""
conn = MagicMock()
mock_redis.return_value = conn
queue = MagicMock()
queue.enqueue.side_effect = RuntimeError("connection refused")
mock_queue_fn.return_value = queue
job_id, err = tjs.enqueue_trace_events_job(
"query_tool", ["CID-1"], ["history"], {},
)
assert job_id is None
assert "connection refused" in err
# Meta key should be cleaned up
conn.delete.assert_called_once()
# ---------------------------------------------------------------------------
# get_job_status
# ---------------------------------------------------------------------------
@patch.object(tjs, "get_redis_client")
def test_get_job_status_found(mock_redis):
"""Should return status dict from Redis hash."""
conn = MagicMock()
conn.hgetall.return_value = {
"profile": "query_tool",
"cid_count": "100",
"domains": "history,materials",
"status": "started",
"progress": "fetching events",
"created_at": "1740000000.0",
"completed_at": "",
"error": "",
}
mock_redis.return_value = conn
status = tjs.get_job_status("trace-evt-abc123")
assert status is not None
assert status["job_id"] == "trace-evt-abc123"
assert status["status"] == "started"
assert status["cid_count"] == 100
assert status["domains"] == ["history", "materials"]
assert status["error"] is None
@patch.object(tjs, "get_redis_client")
def test_get_job_status_not_found(mock_redis):
"""Should return None when job metadata does not exist."""
conn = MagicMock()
conn.hgetall.return_value = {}
mock_redis.return_value = conn
assert tjs.get_job_status("trace-evt-nonexistent") is None
# ---------------------------------------------------------------------------
# get_job_result
# ---------------------------------------------------------------------------
@patch.object(tjs, "get_redis_client")
def test_get_job_result_found_chunked(mock_redis):
"""Should return reconstructed result from chunked Redis keys."""
result_meta = {
"profile": "query_tool",
"domains": {"history": {"chunks": 1, "total": 1}},
"failed_domains": [],
}
chunk_data = [{"CONTAINERID": "CID-1"}]
conn = MagicMock()
def _get_side_effect(key):
if key.endswith(":result:meta"):
return json.dumps(result_meta)
if key.endswith(":result:history:0"):
return json.dumps(chunk_data)
if key.endswith(":result:aggregation"):
return None
return None
conn.get.side_effect = _get_side_effect
mock_redis.return_value = conn
result = tjs.get_job_result("trace-evt-abc123")
assert result is not None
assert result["stage"] == "events"
assert result["results"]["history"]["count"] == 1
assert result["results"]["history"]["total"] == 1
@patch.object(tjs, "get_redis_client")
def test_get_job_result_found_legacy(mock_redis):
"""Should fall back to legacy single-key result when no chunked meta exists."""
result_data = {
"stage": "events",
"results": {"history": {"data": [{"CONTAINERID": "CID-1"}], "count": 1}},
"aggregation": None,
}
conn = MagicMock()
def _get_side_effect(key):
if key.endswith(":result:meta"):
return None # no chunked meta
if key.endswith(":result"):
return json.dumps(result_data)
return None
conn.get.side_effect = _get_side_effect
mock_redis.return_value = conn
result = tjs.get_job_result("trace-evt-abc123")
assert result is not None
assert result["stage"] == "events"
assert result["results"]["history"]["count"] == 1
@patch.object(tjs, "get_redis_client")
def test_get_job_result_not_found(mock_redis):
"""Should return None when result key does not exist."""
conn = MagicMock()
conn.get.return_value = None
mock_redis.return_value = conn
assert tjs.get_job_result("trace-evt-expired") is None
@patch.object(tjs, "get_redis_client")
def test_get_job_result_with_domain_filter(mock_redis):
"""Should return filtered result with pagination from chunked storage."""
result_meta = {
"profile": "query_tool",
"domains": {
"history": {"chunks": 1, "total": 3},
"materials": {"chunks": 1, "total": 1},
},
"failed_domains": [],
}
history_chunk = [{"id": 1}, {"id": 2}, {"id": 3}]
materials_chunk = [{"id": 10}]
conn = MagicMock()
def _get_side_effect(key):
if key.endswith(":result:meta"):
return json.dumps(result_meta)
if key.endswith(":result:history:0"):
return json.dumps(history_chunk)
if key.endswith(":result:materials:0"):
return json.dumps(materials_chunk)
if key.endswith(":result:aggregation"):
return None
return None
conn.get.side_effect = _get_side_effect
mock_redis.return_value = conn
result = tjs.get_job_result("trace-evt-abc", domain="history", offset=1, limit=1)
assert "history" in result["results"]
assert "materials" not in result["results"]
assert result["results"]["history"]["data"] == [{"id": 2}]
assert result["results"]["history"]["total"] == 3
# ---------------------------------------------------------------------------
# execute_trace_events_job (worker entry point)
# ---------------------------------------------------------------------------
@patch.object(tjs, "get_redis_client")
@patch("mes_dashboard.services.event_fetcher.EventFetcher.fetch_events")
def test_execute_job_success(mock_fetch, mock_redis):
"""Worker should fetch events, store result, and update meta to finished."""
mock_fetch.return_value = {"CID-1": [{"CONTAINERID": "CID-1"}]}
conn = MagicMock()
mock_redis.return_value = conn
tjs.execute_trace_events_job(
"test-job-1", "query_tool", ["CID-1"], ["history"], {},
)
mock_fetch.assert_called_once_with(["CID-1"], "history")
# Result should be stored as chunked keys (chunk + result meta)
setex_calls = [c for c in conn.method_calls if c[0] == "setex"]
assert len(setex_calls) == 2 # 1 chunk + 1 result meta
# Find the result meta setex call
result_meta_call = [c for c in setex_calls if ":result:meta" in str(c)]
assert len(result_meta_call) == 1
stored_meta = json.loads(result_meta_call[0][1][2])
assert "history" in stored_meta["domains"]
assert stored_meta["domains"]["history"]["total"] == 1
# Find the chunk setex call
chunk_call = [c for c in setex_calls if ":result:history:0" in str(c)]
assert len(chunk_call) == 1
stored_chunk = json.loads(chunk_call[0][1][2])
assert len(stored_chunk) == 1
assert stored_chunk[0]["CONTAINERID"] == "CID-1"
# Job meta should be updated to finished
hset_calls = [c for c in conn.method_calls if c[0] == "hset"]
last_meta = hset_calls[-1][2]["mapping"]
assert last_meta["status"] == "finished"
@patch.object(tjs, "get_redis_client")
@patch("mes_dashboard.services.event_fetcher.EventFetcher.fetch_events")
def test_execute_job_domain_failure_records_partial(mock_fetch, mock_redis):
"""Domain fetch failure should result in partial failure, not job crash."""
mock_fetch.side_effect = RuntimeError("db timeout")
conn = MagicMock()
mock_redis.return_value = conn
tjs.execute_trace_events_job(
"test-job-2", "query_tool", ["CID-1"], ["history"], {},
)
# Result meta should still be stored (with failed_domains)
setex_calls = [c for c in conn.method_calls if c[0] == "setex"]
assert len(setex_calls) == 1 # only result meta (no chunks since domain failed)
stored_meta = json.loads(setex_calls[0][1][2])
assert "history" in stored_meta["failed_domains"]
# Job meta should still be finished (partial failure is not a job crash)
hset_calls = [c for c in conn.method_calls if c[0] == "hset"]
last_meta = hset_calls[-1][2]["mapping"]
assert last_meta["status"] == "finished"
# ---------------------------------------------------------------------------
# _store_chunked_result
# ---------------------------------------------------------------------------
@patch.object(tjs, "get_redis_client")
def test_store_chunked_result_splits_batches(mock_redis):
"""Large domain data should be split into multiple chunks."""
conn = MagicMock()
mock_redis.return_value = conn
# 12 records, batch size 5 → 3 chunks (5+5+2)
rows = [{"id": i} for i in range(12)]
results = {"history": {"data": rows, "count": 12}}
original_batch_size = tjs.TRACE_STREAM_BATCH_SIZE
tjs.TRACE_STREAM_BATCH_SIZE = 5
try:
tjs._store_chunked_result(conn, "job-1", "query_tool", results, None, [])
finally:
tjs.TRACE_STREAM_BATCH_SIZE = original_batch_size
setex_calls = [c for c in conn.method_calls if c[0] == "setex"]
# 3 chunk keys + 1 result meta = 4
assert len(setex_calls) == 4
# Verify result meta
meta_call = [c for c in setex_calls if ":result:meta" in str(c)]
assert len(meta_call) == 1
meta = json.loads(meta_call[0][1][2])
assert meta["domains"]["history"]["chunks"] == 3
assert meta["domains"]["history"]["total"] == 12
# Verify chunks
chunk_calls = [c for c in setex_calls if ":result:history:" in str(c)]
assert len(chunk_calls) == 3
chunk_0 = json.loads(chunk_calls[0][1][2])
assert len(chunk_0) == 5
@patch.object(tjs, "get_redis_client")
def test_store_chunked_result_with_aggregation(mock_redis):
"""Aggregation should be stored in a separate key."""
conn = MagicMock()
mock_redis.return_value = conn
results = {"history": {"data": [{"id": 1}], "count": 1}}
aggregation = {"summary": {"total": 100}}
tjs._store_chunked_result(conn, "job-1", "mid_section_defect", results, aggregation, [])
setex_calls = [c for c in conn.method_calls if c[0] == "setex"]
agg_call = [c for c in setex_calls if ":result:aggregation" in str(c)]
assert len(agg_call) == 1
stored_agg = json.loads(agg_call[0][1][2])
assert stored_agg["summary"]["total"] == 100
# ---------------------------------------------------------------------------
# stream_job_result_ndjson
# ---------------------------------------------------------------------------
@patch.object(tjs, "get_redis_client")
def test_stream_ndjson_chunked(mock_redis):
"""NDJSON stream should yield correct protocol lines for chunked result."""
result_meta = {
"profile": "query_tool",
"domains": {"history": {"chunks": 2, "total": 3}},
"failed_domains": [],
}
chunk_0 = [{"id": 1}, {"id": 2}]
chunk_1 = [{"id": 3}]
conn = MagicMock()
def _get_side_effect(key):
if key.endswith(":result:meta"):
return json.dumps(result_meta)
if key.endswith(":result:history:0"):
return json.dumps(chunk_0)
if key.endswith(":result:history:1"):
return json.dumps(chunk_1)
if key.endswith(":result:aggregation"):
return None
return None
conn.get.side_effect = _get_side_effect
mock_redis.return_value = conn
lines = list(tjs.stream_job_result_ndjson("job-1"))
parsed = [json.loads(line) for line in lines]
types = [p["type"] for p in parsed]
assert types == ["meta", "domain_start", "records", "records", "domain_end", "complete"]
assert parsed[0]["domains"] == ["history"]
assert parsed[1]["domain"] == "history"
assert parsed[1]["total"] == 3
assert parsed[2]["count"] == 2
assert parsed[3]["count"] == 1
assert parsed[4]["count"] == 3
assert parsed[5]["total_records"] == 3
@patch.object(tjs, "get_redis_client")
def test_stream_ndjson_with_aggregation(mock_redis):
"""NDJSON stream should include aggregation line when present."""
result_meta = {
"profile": "mid_section_defect",
"domains": {"upstream_history": {"chunks": 1, "total": 1}},
"failed_domains": [],
}
conn = MagicMock()
def _get_side_effect(key):
if key.endswith(":result:meta"):
return json.dumps(result_meta)
if key.endswith(":result:upstream_history:0"):
return json.dumps([{"id": 1}])
if key.endswith(":result:aggregation"):
return json.dumps({"summary": "ok"})
return None
conn.get.side_effect = _get_side_effect
mock_redis.return_value = conn
lines = list(tjs.stream_job_result_ndjson("job-1"))
parsed = [json.loads(line) for line in lines]
types = [p["type"] for p in parsed]
assert "aggregation" in types
agg_line = [p for p in parsed if p["type"] == "aggregation"][0]
assert agg_line["data"]["summary"] == "ok"
@patch.object(tjs, "get_redis_client")
def test_stream_ndjson_legacy_fallback(mock_redis):
"""NDJSON stream should emit full_result for legacy single-key storage."""
legacy_result = {
"stage": "events",
"results": {"history": {"data": [{"id": 1}], "count": 1}},
"aggregation": None,
}
conn = MagicMock()
def _get_side_effect(key):
if key.endswith(":result:meta"):
return None # no chunked meta
if key.endswith(":result"):
return json.dumps(legacy_result)
return None
conn.get.side_effect = _get_side_effect
mock_redis.return_value = conn
lines = list(tjs.stream_job_result_ndjson("job-1"))
parsed = [json.loads(line) for line in lines]
assert len(parsed) == 1
assert parsed[0]["type"] == "full_result"
assert parsed[0]["data"]["stage"] == "events"
@patch.object(tjs, "get_redis_client")
def test_stream_ndjson_with_failed_domains(mock_redis):
"""NDJSON stream should include warning line for partial failures."""
result_meta = {
"profile": "query_tool",
"domains": {"materials": {"chunks": 1, "total": 1}},
"failed_domains": ["history"],
}
conn = MagicMock()
def _get_side_effect(key):
if key.endswith(":result:meta"):
return json.dumps(result_meta)
if key.endswith(":result:materials:0"):
return json.dumps([{"id": 1}])
if key.endswith(":result:aggregation"):
return None
return None
conn.get.side_effect = _get_side_effect
mock_redis.return_value = conn
lines = list(tjs.stream_job_result_ndjson("job-1"))
parsed = [json.loads(line) for line in lines]
types = [p["type"] for p in parsed]
assert "warning" in types
warning = [p for p in parsed if p["type"] == "warning"][0]
assert warning["code"] == "EVENTS_PARTIAL_FAILURE"
assert "history" in warning["failed_domains"]
# ---------------------------------------------------------------------------
# _flatten_domain_records
# ---------------------------------------------------------------------------
def test_flatten_domain_records():
events_by_cid = {
"CID-1": [{"CONTAINERID": "CID-1", "EVT": "A"}],
"CID-2": [{"CONTAINERID": "CID-2", "EVT": "B"}, {"CONTAINERID": "CID-2", "EVT": "C"}],
}
rows = tjs._flatten_domain_records(events_by_cid)
assert len(rows) == 3

View File

@@ -482,3 +482,343 @@ def test_non_msd_events_cache_unchanged(mock_cache_set, mock_cache_get, mock_fet
assert payload['stage'] == 'events'
# EventFetcher should NOT have been called — served from cache
mock_fetch_events.assert_not_called()
# ---- Admission control tests ----
def test_events_non_msd_cid_limit_returns_413(monkeypatch):
"""Non-MSD profile exceeding CID limit should return 413."""
monkeypatch.setattr(
'mes_dashboard.routes.trace_routes.TRACE_EVENTS_CID_LIMIT', 5,
)
client = _client()
response = client.post(
'/api/trace/events',
json={
'profile': 'query_tool',
'container_ids': [f'CID-{i}' for i in range(10)],
'domains': ['history'],
},
)
assert response.status_code == 413
payload = response.get_json()
assert payload['error']['code'] == 'CID_LIMIT_EXCEEDED'
@patch('mes_dashboard.routes.trace_routes.build_trace_aggregation_from_events')
@patch('mes_dashboard.routes.trace_routes.EventFetcher.fetch_events')
def test_events_msd_bypasses_cid_limit(
mock_fetch_events,
mock_build_aggregation,
monkeypatch,
):
"""MSD profile should bypass CID limit and proceed normally."""
monkeypatch.setattr(
'mes_dashboard.routes.trace_routes.TRACE_EVENTS_CID_LIMIT', 5,
)
mock_fetch_events.return_value = {
f'CID-{i}': [{'CONTAINERID': f'CID-{i}', 'WORKCENTER_GROUP': '測試'}]
for i in range(10)
}
mock_build_aggregation.return_value = {
'kpi': {'total_input': 10},
'charts': {},
'daily_trend': [],
'available_loss_reasons': [],
'genealogy_status': 'ready',
'detail_total_count': 0,
}
client = _client()
response = client.post(
'/api/trace/events',
json={
'profile': 'mid_section_defect',
'container_ids': [f'CID-{i}' for i in range(10)],
'domains': ['upstream_history'],
'params': {
'start_date': '2025-01-01',
'end_date': '2025-01-31',
},
'lineage': {'ancestors': {}},
'seed_container_ids': [f'CID-{i}' for i in range(10)],
},
)
assert response.status_code == 200
payload = response.get_json()
assert payload['stage'] == 'events'
mock_fetch_events.assert_called()
@patch('mes_dashboard.routes.trace_routes.EventFetcher.fetch_events')
def test_events_non_msd_within_limit_proceeds(mock_fetch_events):
"""Non-MSD profile within CID limit should proceed normally."""
mock_fetch_events.return_value = {
'CID-001': [{'CONTAINERID': 'CID-001', 'EVENTTYPE': 'TRACK_IN'}]
}
client = _client()
response = client.post(
'/api/trace/events',
json={
'profile': 'query_tool',
'container_ids': ['CID-001'],
'domains': ['history'],
},
)
assert response.status_code == 200
payload = response.get_json()
assert payload['stage'] == 'events'
# ---- Async job queue tests ----
@patch('mes_dashboard.routes.trace_routes.enqueue_trace_events_job')
@patch('mes_dashboard.routes.trace_routes.is_async_available', return_value=True)
def test_events_routes_to_async_above_threshold(
mock_async_avail,
mock_enqueue,
monkeypatch,
):
"""CID count > async threshold should return 202 with job_id."""
monkeypatch.setattr(
'mes_dashboard.routes.trace_routes.TRACE_ASYNC_CID_THRESHOLD', 5,
)
mock_enqueue.return_value = ('trace-evt-abc123', None)
client = _client()
response = client.post(
'/api/trace/events',
json={
'profile': 'query_tool',
'container_ids': [f'CID-{i}' for i in range(10)],
'domains': ['history'],
},
)
assert response.status_code == 202
payload = response.get_json()
assert payload['async'] is True
assert payload['job_id'] == 'trace-evt-abc123'
assert '/api/trace/job/trace-evt-abc123' in payload['status_url']
@patch('mes_dashboard.routes.trace_routes.EventFetcher.fetch_events')
@patch('mes_dashboard.routes.trace_routes.is_async_available', return_value=False)
def test_events_falls_back_to_sync_when_async_unavailable(
mock_async_avail,
mock_fetch_events,
monkeypatch,
):
"""When async is unavailable, should fall through to sync processing."""
monkeypatch.setattr(
'mes_dashboard.routes.trace_routes.TRACE_ASYNC_CID_THRESHOLD', 2,
)
mock_fetch_events.return_value = {
f'CID-{i}': [{'CONTAINERID': f'CID-{i}'}]
for i in range(3)
}
client = _client()
response = client.post(
'/api/trace/events',
json={
'profile': 'query_tool',
'container_ids': [f'CID-{i}' for i in range(3)],
'domains': ['history'],
},
)
assert response.status_code == 200
payload = response.get_json()
assert payload['stage'] == 'events'
mock_fetch_events.assert_called()
@patch('mes_dashboard.routes.trace_routes.enqueue_trace_events_job')
@patch('mes_dashboard.routes.trace_routes.is_async_available', return_value=True)
def test_events_falls_back_to_413_when_enqueue_fails(
mock_async_avail,
mock_enqueue,
monkeypatch,
):
"""When enqueue fails for non-MSD, should fall back to 413."""
monkeypatch.setattr(
'mes_dashboard.routes.trace_routes.TRACE_ASYNC_CID_THRESHOLD', 3,
)
monkeypatch.setattr(
'mes_dashboard.routes.trace_routes.TRACE_EVENTS_CID_LIMIT', 5,
)
mock_enqueue.return_value = (None, 'Redis down')
client = _client()
response = client.post(
'/api/trace/events',
json={
'profile': 'query_tool',
'container_ids': [f'CID-{i}' for i in range(10)],
'domains': ['history'],
},
)
assert response.status_code == 413
payload = response.get_json()
assert payload['error']['code'] == 'CID_LIMIT_EXCEEDED'
# ---- Job status/result endpoint tests ----
@patch('mes_dashboard.routes.trace_routes.get_job_status')
def test_job_status_found(mock_status):
"""GET /api/trace/job/<id> should return status."""
mock_status.return_value = {
'job_id': 'trace-evt-abc',
'status': 'started',
'profile': 'query_tool',
'cid_count': 100,
'domains': ['history'],
'progress': 'fetching events',
'created_at': 1740000000.0,
'elapsed_seconds': 15.0,
'error': None,
}
client = _client()
response = client.get('/api/trace/job/trace-evt-abc')
assert response.status_code == 200
payload = response.get_json()
assert payload['status'] == 'started'
assert payload['job_id'] == 'trace-evt-abc'
@patch('mes_dashboard.routes.trace_routes.get_job_status', return_value=None)
def test_job_status_not_found(mock_status):
"""GET /api/trace/job/<id> should return 404 for unknown job."""
client = _client()
response = client.get('/api/trace/job/trace-evt-nonexist')
assert response.status_code == 404
@patch('mes_dashboard.routes.trace_routes.get_job_result')
@patch('mes_dashboard.routes.trace_routes.get_job_status')
def test_job_result_success(mock_status, mock_result):
"""GET /api/trace/job/<id>/result should return result for finished job."""
mock_status.return_value = {'status': 'finished', 'job_id': 'j1'}
mock_result.return_value = {
'stage': 'events',
'results': {'history': {'data': [], 'count': 0}},
'aggregation': None,
}
client = _client()
response = client.get('/api/trace/job/j1/result')
assert response.status_code == 200
payload = response.get_json()
assert payload['stage'] == 'events'
@patch('mes_dashboard.routes.trace_routes.get_job_status')
def test_job_result_not_complete(mock_status):
"""GET /api/trace/job/<id>/result should return 409 for non-finished job."""
mock_status.return_value = {'status': 'started', 'job_id': 'j2'}
client = _client()
response = client.get('/api/trace/job/j2/result')
assert response.status_code == 409
payload = response.get_json()
assert payload['error']['code'] == 'JOB_NOT_COMPLETE'
@patch('mes_dashboard.routes.trace_routes.get_job_result', return_value=None)
@patch('mes_dashboard.routes.trace_routes.get_job_status')
def test_job_result_expired(mock_status, mock_result):
"""GET /api/trace/job/<id>/result should return 404 if result expired."""
mock_status.return_value = {'status': 'finished', 'job_id': 'j3'}
client = _client()
response = client.get('/api/trace/job/j3/result')
assert response.status_code == 404
# ---------------------------------------------------------------------------
# NDJSON stream endpoint
# ---------------------------------------------------------------------------
@patch("mes_dashboard.routes.trace_routes.stream_job_result_ndjson")
@patch("mes_dashboard.routes.trace_routes.get_job_status")
def test_job_stream_success(mock_status, mock_stream):
"""GET /api/trace/job/<id>/stream should return NDJSON for finished job."""
mock_status.return_value = {'status': 'finished', 'job_id': 'j1'}
mock_stream.return_value = iter([
'{"type":"meta","job_id":"j1","domains":["history"]}\n',
'{"type":"complete","total_records":0}\n',
])
reset_rate_limits_for_tests()
client = _client()
response = client.get('/api/trace/job/j1/stream')
assert response.status_code == 200
assert response.content_type.startswith('application/x-ndjson')
lines = [l for l in response.data.decode().strip().split('\n') if l.strip()]
assert len(lines) == 2
@patch("mes_dashboard.routes.trace_routes.get_job_status")
def test_job_stream_not_found(mock_status):
"""GET /api/trace/job/<id>/stream should return 404 for missing job."""
mock_status.return_value = None
reset_rate_limits_for_tests()
client = _client()
response = client.get('/api/trace/job/j-missing/stream')
assert response.status_code == 404
@patch("mes_dashboard.routes.trace_routes.get_job_status")
def test_job_stream_not_complete(mock_status):
"""GET /api/trace/job/<id>/stream should return 409 for incomplete job."""
mock_status.return_value = {'status': 'started', 'job_id': 'j2'}
reset_rate_limits_for_tests()
client = _client()
response = client.get('/api/trace/job/j2/stream')
assert response.status_code == 409
# ---------------------------------------------------------------------------
# 202 response includes stream_url
# ---------------------------------------------------------------------------
@patch("mes_dashboard.routes.trace_routes.is_async_available", return_value=True)
@patch("mes_dashboard.routes.trace_routes.enqueue_trace_events_job")
def test_events_async_response_includes_stream_url(mock_enqueue, mock_async):
"""Events 202 response should include stream_url field."""
mock_enqueue.return_value = ("trace-evt-xyz", None)
reset_rate_limits_for_tests()
client = _client()
cids = [f"CID-{i}" for i in range(25000)]
response = client.post('/api/trace/events', json={
'profile': 'query_tool',
'container_ids': cids,
'domains': ['history'],
})
assert response.status_code == 202
data = response.get_json()
assert data["stream_url"] == "/api/trace/job/trace-evt-xyz/stream"
assert data["status_url"] == "/api/trace/job/trace-evt-xyz"