From dbe0da057c53f751639dcd43eb9a33b20b76a0a4 Mon Sep 17 00:00:00 2001 From: egg Date: Wed, 25 Feb 2026 21:01:27 +0800 Subject: [PATCH] feat(trace-pipeline): memory triage, async job queue, and NDJSON streaming Three proposals addressing the 2026-02-25 trace pipeline OOM crash (114K CIDs): 1. trace-events-memory-triage: fetchmany iterator (read_sql_df_slow_iter), admission control (50K CID limit for non-MSD), cache skip for large queries, early memory release with gc.collect() 2. trace-async-job-queue: RQ-based async jobs for queries >20K CIDs, separate worker process with isolated memory, frontend polling via useTraceProgress composable, systemd service + deploy scripts 3. trace-streaming-response: chunked Redis storage (TRACE_STREAM_BATCH_SIZE=5000), NDJSON stream endpoint (GET /api/trace/job//stream), frontend ReadableStream consumer for progressive rendering, backward-compatible with legacy single-key storage All three proposals archived. 1101 tests pass, frontend builds clean. Co-Authored-By: Claude Opus 4.6 --- .env.example | 49 +- deploy/mes-dashboard-trace-worker.service | 37 ++ deploy/mes-dashboard.service | 8 + environment.yml | 3 + .../shared-composables/useTraceProgress.js | 186 +++++- .../trace-async-job-queue/.openspec.yaml | 2 + .../archive/trace-async-job-queue/design.md | 149 +++++ .../archive/trace-async-job-queue/proposal.md | 45 ++ .../specs/trace-staged-api/spec.md | 47 ++ .../archive/trace-async-job-queue/tasks.md | 39 ++ .../trace-events-memory-triage/design.md | 174 +++++ .../trace-events-memory-triage/proposal.md | 39 ++ .../specs/event-fetcher-unified/spec.md | 15 + .../specs/trace-staged-api/spec.md | 19 + .../trace-events-memory-triage/tasks.md | 37 ++ .../trace-streaming-response/.openspec.yaml | 2 + .../trace-streaming-response/design.md | 140 ++++ .../trace-streaming-response/proposal.md | 39 ++ .../specs/event-fetcher-unified/spec.md | 14 + .../specs/trace-staged-api/spec.md | 22 + .../archive/trace-streaming-response/tasks.md | 35 + requirements.txt | 3 + scripts/deploy.sh | 5 + scripts/start_server.sh | 128 ++++ src/mes_dashboard/core/database.py | 102 +++ src/mes_dashboard/routes/trace_routes.py | 123 +++- src/mes_dashboard/services/event_fetcher.py | 61 +- .../services/trace_job_service.py | 616 ++++++++++++++++++ tests/test_database_slow_iter.py | 119 ++++ tests/test_event_fetcher.py | 122 ++-- tests/test_trace_job_service.py | 507 ++++++++++++++ tests/test_trace_routes.py | 340 ++++++++++ 32 files changed, 3140 insertions(+), 87 deletions(-) create mode 100644 deploy/mes-dashboard-trace-worker.service create mode 100644 openspec/changes/archive/trace-async-job-queue/.openspec.yaml create mode 100644 openspec/changes/archive/trace-async-job-queue/design.md create mode 100644 openspec/changes/archive/trace-async-job-queue/proposal.md create mode 100644 openspec/changes/archive/trace-async-job-queue/specs/trace-staged-api/spec.md create mode 100644 openspec/changes/archive/trace-async-job-queue/tasks.md create mode 100644 openspec/changes/archive/trace-events-memory-triage/design.md create mode 100644 openspec/changes/archive/trace-events-memory-triage/proposal.md create mode 100644 openspec/changes/archive/trace-events-memory-triage/specs/event-fetcher-unified/spec.md create mode 100644 openspec/changes/archive/trace-events-memory-triage/specs/trace-staged-api/spec.md create mode 100644 openspec/changes/archive/trace-events-memory-triage/tasks.md create mode 100644 openspec/changes/archive/trace-streaming-response/.openspec.yaml create mode 100644 openspec/changes/archive/trace-streaming-response/design.md create mode 100644 openspec/changes/archive/trace-streaming-response/proposal.md create mode 100644 openspec/changes/archive/trace-streaming-response/specs/event-fetcher-unified/spec.md create mode 100644 openspec/changes/archive/trace-streaming-response/specs/trace-staged-api/spec.md create mode 100644 openspec/changes/archive/trace-streaming-response/tasks.md create mode 100644 src/mes_dashboard/services/trace_job_service.py create mode 100644 tests/test_database_slow_iter.py create mode 100644 tests/test_trace_job_service.py diff --git a/.env.example b/.env.example index d6a1bfa..295dc43 100644 --- a/.env.example +++ b/.env.example @@ -85,7 +85,9 @@ LOCAL_AUTH_PASSWORD= # Server bind address and port GUNICORN_BIND=0.0.0.0:8080 -# Number of worker processes (recommend: 2 * CPU cores + 1) +# Number of worker processes +# Recommend: 2 for ≤ 8GB RAM (trace queries consume 2-3 GB peak per worker) +# Recommend: 4 for ≥ 16GB RAM GUNICORN_WORKERS=2 # Threads per worker @@ -168,14 +170,55 @@ CIRCUIT_BREAKER_WINDOW_SIZE=10 TRACE_SLOW_THRESHOLD_SECONDS=15 # Max parallel workers for events domain fetching (per request) -TRACE_EVENTS_MAX_WORKERS=4 +# Recommend: 2 (each worker × EVENT_FETCHER_MAX_WORKERS = peak slow query slots) +TRACE_EVENTS_MAX_WORKERS=2 # Max parallel workers for EventFetcher batch queries (per domain) -EVENT_FETCHER_MAX_WORKERS=4 +# Recommend: 2 (peak concurrent slow queries = TRACE_EVENTS_MAX_WORKERS × this) +EVENT_FETCHER_MAX_WORKERS=2 # Max parallel workers for forward pipeline WIP+rejects fetching FORWARD_PIPELINE_MAX_WORKERS=2 +# --- Admission Control (提案 1: trace-events-memory-triage) --- +# Max container IDs per synchronous events request. +# Requests exceeding this limit return HTTP 413 (or HTTP 202 when async job queue is enabled). +# Set based on available RAM: 50K CIDs ≈ 2-3 GB peak memory per request. +TRACE_EVENTS_CID_LIMIT=50000 + +# Cursor fetchmany batch size for slow query iterator mode. +# Smaller = less peak memory; larger = fewer Oracle round-trips. +DB_SLOW_FETCHMANY_SIZE=5000 + +# Domain-level cache skip threshold (CID count). +# When CID count exceeds this, per-domain and route-level cache writes are skipped. +EVENT_FETCHER_CACHE_SKIP_CID_THRESHOLD=10000 + +# --- Async Job Queue (提案 2: trace-async-job-queue) --- +# Enable RQ trace worker for async large query processing +# Set to true and start the worker: rq worker trace-events +TRACE_WORKER_ENABLED=false + +# CID threshold for automatic async job routing (requires RQ worker). +# Requests with CID count > threshold are queued instead of processed synchronously. +TRACE_ASYNC_CID_THRESHOLD=20000 + +# Job result retention time in seconds (default: 3600 = 1 hour) +TRACE_JOB_TTL_SECONDS=3600 + +# Job execution timeout in seconds (default: 1800 = 30 minutes) +TRACE_JOB_TIMEOUT_SECONDS=1800 + +# Number of RQ worker processes for trace jobs +TRACE_WORKER_COUNT=1 + +# RQ queue name for trace jobs +TRACE_WORKER_QUEUE=trace-events + +# --- Streaming Response (提案 3: trace-streaming-response) --- +# NDJSON stream batch size (records per NDJSON line) +TRACE_STREAM_BATCH_SIZE=5000 + # ============================================================ # Performance Metrics Configuration # ============================================================ diff --git a/deploy/mes-dashboard-trace-worker.service b/deploy/mes-dashboard-trace-worker.service new file mode 100644 index 0000000..c38b7bb --- /dev/null +++ b/deploy/mes-dashboard-trace-worker.service @@ -0,0 +1,37 @@ +[Unit] +Description=MES Dashboard Trace Worker (RQ, Conda Runtime) +Documentation=https://github.com/your-org/mes-dashboard +After=network.target redis-server.service +Requires=redis-server.service + +[Service] +Type=simple +User=www-data +Group=www-data +WorkingDirectory=/opt/mes-dashboard +EnvironmentFile=-/opt/mes-dashboard/.env +Environment="PYTHONPATH=/opt/mes-dashboard/src" + +ExecStart=/usr/bin/env bash -lc 'exec "${CONDA_BIN:-/opt/miniconda3/bin/conda}" run --no-capture-output -n "${CONDA_ENV_NAME:-mes-dashboard}" rq worker "${TRACE_WORKER_QUEUE:-trace-events}" --url "${REDIS_URL:-redis://localhost:6379/0}"' + +KillSignal=SIGTERM +TimeoutStopSec=60 +Restart=always +RestartSec=10 + +StandardOutput=journal +StandardError=journal +SyslogIdentifier=mes-dashboard-trace-worker + +# Memory protection: trace worker handles large queries independently. +# MemoryMax prevents single large job from killing the VM. +MemoryHigh=3G +MemoryMax=4G + +NoNewPrivileges=yes +PrivateTmp=yes +ProtectSystem=strict +ReadWritePaths=/opt/mes-dashboard/logs + +[Install] +WantedBy=multi-user.target diff --git a/deploy/mes-dashboard.service b/deploy/mes-dashboard.service index 729d0a8..ad66b7a 100644 --- a/deploy/mes-dashboard.service +++ b/deploy/mes-dashboard.service @@ -27,6 +27,14 @@ StandardOutput=journal StandardError=journal SyslogIdentifier=mes-dashboard +# Memory protection (cgroup v2): prevents OOM from killing entire VM. +# MemoryHigh: soft limit — kernel starts reclaiming when exceeded (service stays alive). +# MemoryMax: hard limit — OOM kills only this service (host OS survives). +# Adjust based on VM RAM: MemoryHigh ≈ 70% of VM RAM, MemoryMax ≈ 85% of VM RAM. +# For 7GB VM: MemoryHigh=5G, MemoryMax=6G (leaves ~1GB for OS + Redis). +MemoryHigh=5G +MemoryMax=6G + NoNewPrivileges=yes PrivateTmp=yes ProtectSystem=strict diff --git a/environment.yml b/environment.yml index b3ca7b2..9a4ae9a 100644 --- a/environment.yml +++ b/environment.yml @@ -34,6 +34,9 @@ dependencies: - redis>=5.0.0,<6.0.0 - hiredis>=2.0.0,<4.0.0 # C parser for better performance + # Task Queue (async trace jobs) + - rq>=1.16.0,<2.0.0 + # HTTP Client - requests>=2.28.0,<3.0.0 diff --git a/frontend/src/shared-composables/useTraceProgress.js b/frontend/src/shared-composables/useTraceProgress.js index 9dc50b0..8f278e1 100644 --- a/frontend/src/shared-composables/useTraceProgress.js +++ b/frontend/src/shared-composables/useTraceProgress.js @@ -1,10 +1,12 @@ import { reactive, ref } from 'vue'; -import { apiPost, ensureMesApiAvailable } from '../core/api.js'; +import { apiGet, apiPost, ensureMesApiAvailable } from '../core/api.js'; ensureMesApiAvailable(); const DEFAULT_STAGE_TIMEOUT_MS = 360000; +const JOB_POLL_INTERVAL_MS = 3000; +const JOB_POLL_MAX_MS = 1800000; // 30 minutes const PROFILE_DOMAINS = Object.freeze({ query_tool: ['history', 'materials', 'rejects', 'holds', 'jobs'], mid_section_defect: ['upstream_history', 'materials'], @@ -71,6 +73,107 @@ function collectAllContainerIds(seedContainerIds, lineagePayload, direction) { return merged; } +function sleep(ms) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +/** + * Poll an async trace job until it completes or fails. + * + * @param {string} statusUrl - The job status endpoint URL + * @param {object} options - { signal, onProgress } + * @returns {Promise} The final status ('finished' or throws) + */ +async function pollJobUntilComplete(statusUrl, { signal, onProgress } = {}) { + const started = Date.now(); + + while (true) { + if (signal?.aborted) { + throw new DOMException('Aborted', 'AbortError'); + } + + const status = await apiGet(statusUrl, { timeout: 15000, signal }); + + if (typeof onProgress === 'function') { + onProgress(status); + } + + if (status.status === 'finished') { + return 'finished'; + } + + if (status.status === 'failed') { + const error = new Error(status.error || '非同步查詢失敗'); + error.errorCode = 'JOB_FAILED'; + throw error; + } + + if (Date.now() - started > JOB_POLL_MAX_MS) { + const error = new Error('非同步查詢超時'); + error.errorCode = 'JOB_POLL_TIMEOUT'; + throw error; + } + + await sleep(JOB_POLL_INTERVAL_MS); + } +} + +/** + * Consume an NDJSON stream from the server, calling onChunk for each line. + * + * @param {string} url - The stream endpoint URL + * @param {object} options - { signal, onChunk } + * @returns {Promise} + */ +async function consumeNDJSONStream(url, { signal, onChunk } = {}) { + const response = await fetch(url, { signal }); + + if (!response.ok) { + const text = await response.text().catch(() => ''); + const error = new Error(`Stream request failed: HTTP ${response.status} ${text}`); + error.errorCode = 'STREAM_FAILED'; + throw error; + } + + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ''; + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split('\n'); + buffer = lines.pop(); // keep incomplete last line + + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed) continue; + try { + const chunk = JSON.parse(trimmed); + if (typeof onChunk === 'function') onChunk(chunk); + } catch { + // skip malformed NDJSON lines + } + } + } + + // process remaining buffer + if (buffer.trim()) { + try { + const chunk = JSON.parse(buffer.trim()); + if (typeof onChunk === 'function') onChunk(chunk); + } catch { + // skip malformed final line + } + } + } finally { + reader.releaseLock(); + } +} + export function useTraceProgress({ profile } = {}) { const current_stage = ref(null); const completed_stages = ref([]); @@ -88,6 +191,15 @@ export function useTraceProgress({ profile } = {}) { events: null, }); + // Async job progress (populated when events stage uses async path) + const job_progress = reactive({ + active: false, + job_id: null, + status: null, + elapsed_seconds: 0, + progress: '', + }); + let activeController = null; function reset() { @@ -99,6 +211,11 @@ export function useTraceProgress({ profile } = {}) { stage_errors.seed = null; stage_errors.lineage = null; stage_errors.events = null; + job_progress.active = false; + job_progress.job_id = null; + job_progress.status = null; + job_progress.elapsed_seconds = 0; + job_progress.progress = ''; } function abort() { @@ -173,7 +290,70 @@ export function useTraceProgress({ profile } = {}) { }, { timeout: DEFAULT_STAGE_TIMEOUT_MS, signal: controller.signal }, ); - stage_results.events = eventsPayload; + + // Async path: server returned 202 with job_id + if (eventsPayload?.async === true && eventsPayload?.status_url) { + job_progress.active = true; + job_progress.job_id = eventsPayload.job_id; + job_progress.status = 'queued'; + + // Phase 1: poll until job finishes + await pollJobUntilComplete(eventsPayload.status_url, { + signal: controller.signal, + onProgress: (status) => { + job_progress.status = status.status; + job_progress.elapsed_seconds = status.elapsed_seconds || 0; + job_progress.progress = status.progress || ''; + }, + }); + + // Phase 2: stream result via NDJSON (or fall back to full result) + const streamUrl = eventsPayload.stream_url; + if (streamUrl) { + job_progress.progress = 'streaming'; + + const streamedResult = { stage: 'events', results: {}, aggregation: null }; + let totalRecords = 0; + + await consumeNDJSONStream(streamUrl, { + signal: controller.signal, + onChunk: (chunk) => { + if (chunk.type === 'domain_start') { + streamedResult.results[chunk.domain] = { data: [], count: 0, total: chunk.total }; + } else if (chunk.type === 'records' && streamedResult.results[chunk.domain]) { + const domainResult = streamedResult.results[chunk.domain]; + domainResult.data.push(...chunk.data); + domainResult.count = domainResult.data.length; + totalRecords += chunk.data.length; + job_progress.progress = `streaming: ${totalRecords} records`; + } else if (chunk.type === 'aggregation') { + streamedResult.aggregation = chunk.data; + } else if (chunk.type === 'warning') { + streamedResult.error = chunk.code; + streamedResult.failed_domains = chunk.failed_domains; + } else if (chunk.type === 'full_result') { + // Legacy fallback: server sent full result as single chunk + Object.assign(streamedResult, chunk.data); + } + }, + }); + + stage_results.events = streamedResult; + } else { + // No stream_url: fall back to fetching full result + const resultUrl = `${eventsPayload.status_url}/result`; + stage_results.events = await apiGet(resultUrl, { + timeout: DEFAULT_STAGE_TIMEOUT_MS, + signal: controller.signal, + }); + } + + job_progress.active = false; + } else { + // Sync path + stage_results.events = eventsPayload; + } + completed_stages.value = [...completed_stages.value, 'events']; return stage_results; } catch (error) { @@ -192,6 +372,7 @@ export function useTraceProgress({ profile } = {}) { } current_stage.value = null; is_running.value = false; + job_progress.active = false; } } @@ -200,6 +381,7 @@ export function useTraceProgress({ profile } = {}) { completed_stages, stage_results, stage_errors, + job_progress, is_running, execute, reset, diff --git a/openspec/changes/archive/trace-async-job-queue/.openspec.yaml b/openspec/changes/archive/trace-async-job-queue/.openspec.yaml new file mode 100644 index 0000000..e331c97 --- /dev/null +++ b/openspec/changes/archive/trace-async-job-queue/.openspec.yaml @@ -0,0 +1,2 @@ +schema: spec-driven +created: 2026-02-25 diff --git a/openspec/changes/archive/trace-async-job-queue/design.md b/openspec/changes/archive/trace-async-job-queue/design.md new file mode 100644 index 0000000..1cc0e71 --- /dev/null +++ b/openspec/changes/archive/trace-async-job-queue/design.md @@ -0,0 +1,149 @@ +## Context + +提案 1(trace-events-memory-triage)解決了峰值記憶體問題並加入 admission control, +但 CID > 50K 的查詢被直接拒絕(HTTP 413)。 +使用者仍有合理需求查詢大範圍資料(例如 TMTT 站 5 個月 = 114K CIDs)。 + +目前 codebase 完全沒有非同步任務基礎設施(無 Celery、RQ、Dramatiq)。 +所有操作都是同步 request-response,受 gunicorn 360s timeout 硬限。 + +需要引入輕量級 job queue,讓大查詢在獨立 worker 進程中執行, +不佔 gunicorn thread、不受 360s timeout 限制、失敗可重試。 + +## Goals / Non-Goals + +**Goals:** +- CID > 閾值的 trace events 查詢改走非同步 job(API 回 202 + job_id) +- 獨立 worker 進程(systemd unit),不佔 gunicorn 資源 +- Job 狀態可查詢(queued/running/completed/failed) +- 結果有 TTL 自動清理,不佔 Redis 長期記憶體 +- 前端自動判斷同步/非同步路徑,顯示 job 進度 +- 最小新依賴(利用既有 Redis) + +**Non-Goals:** +- 不做通用 task queue(只處理 trace events) +- 不做 job 重試(大查詢重試消耗巨大,失敗後使用者手動重新觸發) +- 不做 job 取消(Oracle 查詢一旦發出難以取消) +- 不做 job 持久化到 DB(Redis TTL 足夠) +- 不修改 lineage 階段(仍然同步,通常 < 120s) + +## Decisions + +### D1: RQ(Redis Queue)而非 Celery/Dramatiq + +**決策**:使用 RQ 作為 job queue。 + +**理由**: +- 專案已有 Redis,零額外基礎設施 +- RQ 比 Celery 輕量 10 倍(無 broker 中間層、無 beat scheduler、無 flower) +- RQ worker 是獨立 Python 進程,記憶體隔離 +- API 簡單:`queue.enqueue(func, args, job_timeout=600, result_ttl=3600)` +- 社群活躍,Flask 生態整合良好 + +**替代方案**: +- Celery:過重,專案不需要 beat、chord、chain 等功能 → rejected +- Dramatiq:更輕量但社群較小,Redis broker 整合不如 RQ 成熟 → rejected +- 自製 threading:前面討論已排除(worker 生命週期、記憶體競爭)→ rejected + +### D2: 同步/非同步分界閾值 + +**決策**: + +| CID 數量 | 行為 | +|-----------|------| +| ≤ 20,000 | 同步處理(現有 events endpoint) | +| 20,001 ~ 50,000 | 非同步 job(回 202 + job_id) | +| > 50,000 | 非同步 job(回 202 + job_id),worker 內部分段處理 | + +**env var**:`TRACE_ASYNC_CID_THRESHOLD`(預設 20000) + +**理由**: +- ≤ 20K CIDs 的 events 查詢通常在 60s 內完成,同步足夠 +- 20K-50K 需要 2-5 分鐘,超出使用者耐心且佔住 gunicorn thread +- > 50K 是提案 1 的 admission control 上限,必須走非同步 + +**提案 1 的 HTTP 413 改為 HTTP 202**: +當提案 2 實作完成後,提案 1 的 `TRACE_EVENTS_CID_LIMIT` 檢查改為自動 fallback 到 async job, +不再拒絕請求。 + +### D3: Job 狀態與結果儲存 + +**決策**:使用 RQ 內建的 job 狀態追蹤(儲存在 Redis)。 + +``` +Job lifecycle: + queued → started → finished / failed + +Redis keys: + rq:job:{job_id} # RQ 內建 job metadata + trace:job:{job_id}:meta # 自訂 metadata(profile, cid_count, domains, progress) + trace:job:{job_id}:result # 完成後的結果(JSON,設 TTL) +``` + +**env vars**: +- `TRACE_JOB_TTL_SECONDS`:結果保留時間(預設 3600 = 1 小時) +- `TRACE_JOB_TIMEOUT_SECONDS`:單一 job 最大執行時間(預設 1800 = 30 分鐘) + +### D4: API 設計 + +``` +POST /api/trace/events ← 現有,CID ≤ 閾值時同步 +POST /api/trace/events ← CID > 閾值時回 202 + job_id(同一 endpoint) +GET /api/trace/job/{job_id} ← 查詢 job 狀態 +GET /api/trace/job/{job_id}/result ← 取得完整結果 +GET /api/trace/job/{job_id}/result?domain=history&offset=0&limit=5000 ← 分頁取結果 +``` + +**202 回應格式**: +```json +{ + "stage": "events", + "async": true, + "job_id": "trace-evt-abc123", + "status_url": "/api/trace/job/trace-evt-abc123", + "estimated_seconds": 300 +} +``` + +### D5: Worker 部署架構 + +``` +systemd (mes-dashboard-trace-worker.service) + → conda run -n mes-dashboard rq worker trace-events --with-scheduler + → 獨立進程,獨立記憶體空間 + → MemoryMax=4G(cgroup 保護) +``` + +**env vars**: +- `TRACE_WORKER_COUNT`:worker 進程數(預設 1) +- `TRACE_WORKER_QUEUE`:queue 名稱(預設 `trace-events`) + +### D6: 前端整合 + +`useTraceProgress.js` 修改: + +```javascript +// events 階段 +const eventsResp = await fetchStage('events', payload) +if (eventsResp.status === 202) { + // 非同步路徑 + const { job_id, status_url } = eventsResp.data + return await pollJobUntilComplete(status_url, { + onProgress: (status) => updateProgress('events', status.progress), + pollInterval: 3000, + maxPollTime: 1800000, // 30 分鐘 + }) +} +// 同步路徑(現有) +return eventsResp.data +``` + +## Risks / Trade-offs + +| 風險 | 緩解措施 | +|------|---------| +| RQ 新依賴增加維護成本 | RQ 穩定、API 簡單、只用核心功能 | +| Worker 進程增加記憶體使用 | 獨立 cgroup MemoryMax=4G;空閒時幾乎不佔記憶體 | +| Redis 儲存大結果影響效能 | 結果 TTL=1h 自動清理;配合提案 3 串流取代全量儲存 | +| Worker crash 丟失進行中 job | RQ 內建 failed job registry;使用者可手動重觸發 | +| 前端輪詢增加 API 負載 | pollInterval=3s,只有 active job 才輪詢 | diff --git a/openspec/changes/archive/trace-async-job-queue/proposal.md b/openspec/changes/archive/trace-async-job-queue/proposal.md new file mode 100644 index 0000000..76434b2 --- /dev/null +++ b/openspec/changes/archive/trace-async-job-queue/proposal.md @@ -0,0 +1,45 @@ +## Why + +trace pipeline 處理大量 CIDs(> 20K)時,即使經過分批處理優化(提案 1), +仍然面臨以下根本問題: + +1. **同步 request-response 模型**:gunicorn 360s timeout 是硬限,lineage + events 合計可能超過 300s +2. **worker thread 被佔住**:大查詢期間 1 個 gunicorn thread 完全被佔用,降低即時頁服務能力 +3. **前端無進度回饋**:使用者只能盯著 loading spinner 等 5-6 分鐘,不知道是否正常運作 +4. **失敗後需完全重新執行**:中途 timeout/OOM 後,已完成的 seed-resolve 和 lineage 結果全部浪費 + +業界標準做法是將長時間任務放入非同步佇列(RQ/Dramatiq),API 先回 202 + job_id, +背景 worker 獨立處理,前端輪詢或 SSE 取得結果。 + +## What Changes + +- **引入 RQ(Redis Queue)**:利用既有 Redis 基礎設施,最小化新依賴 +- **新增 trace job worker**:獨立進程(systemd unit),不佔 gunicorn worker 資源 +- **新增 `POST /api/trace/events-async`**:CID > 閾值時回 202 + job_id +- **新增 `GET /api/trace/job/{job_id}`**:輪詢 job 狀態(queued/running/completed/failed) +- **新增 `GET /api/trace/job/{job_id}/result`**:取得完成後的結果(分頁) +- **前端 useTraceProgress 整合**:自動判斷同步/非同步路徑,顯示 job 進度 +- **Job TTL + 自動清理**:結果保留 1 小時後自動過期 +- **新增 systemd unit**:`mes-dashboard-trace-worker.service` +- **更新 .env.example**:`TRACE_ASYNC_CID_THRESHOLD`、`TRACE_JOB_TTL_SECONDS`、`TRACE_WORKER_COUNT` + +## Capabilities + +### New Capabilities + +- `trace-async-job`: 非同步 trace job 佇列(RQ + Redis) + +### Modified Capabilities + +- `trace-staged-api`: events endpoint 整合 async job 路由 +- `progressive-trace-ux`: 前端整合 job 輪詢 + 進度顯示 + +## Impact + +- **新增依賴**:`rq>=1.16.0,<2.0.0`(requirements.txt、environment.yml) +- **後端新增**:trace_job_service.py、trace_routes.py(async endpoints) +- **前端修改**:useTraceProgress.js(async 整合) +- **部署新增**:deploy/mes-dashboard-trace-worker.service、scripts/start_server.sh(worker 管理) +- **部署設定**:.env.example(新 env vars) +- **不影響**:其他 service、即時監控頁、admin 頁面 +- **前置條件**:trace-events-memory-triage(提案 1) diff --git a/openspec/changes/archive/trace-async-job-queue/specs/trace-staged-api/spec.md b/openspec/changes/archive/trace-async-job-queue/specs/trace-staged-api/spec.md new file mode 100644 index 0000000..ab98e26 --- /dev/null +++ b/openspec/changes/archive/trace-async-job-queue/specs/trace-staged-api/spec.md @@ -0,0 +1,47 @@ +## ADDED Requirements + +### Requirement: Trace events endpoint SHALL support asynchronous job execution +The `/api/trace/events` endpoint SHALL automatically route large CID requests to an async job queue. + +#### Scenario: CID count exceeds async threshold +- **WHEN** the events endpoint receives a request with `container_ids` count exceeding `TRACE_ASYNC_CID_THRESHOLD` (env: `TRACE_ASYNC_CID_THRESHOLD`, default: 20000) +- **THEN** the endpoint SHALL enqueue the request to the `trace-events` RQ queue +- **THEN** the endpoint SHALL return HTTP 202 with `{ "stage": "events", "async": true, "job_id": "...", "status_url": "/api/trace/job/{job_id}" }` + +#### Scenario: CID count within sync threshold +- **WHEN** the events endpoint receives a request with `container_ids` count ≤ `TRACE_ASYNC_CID_THRESHOLD` +- **THEN** the endpoint SHALL process synchronously as before + +### Requirement: Trace API SHALL expose job status endpoint +`GET /api/trace/job/{job_id}` SHALL return the current status of an async trace job. + +#### Scenario: Job status query +- **WHEN** a client queries job status with a valid job_id +- **THEN** the endpoint SHALL return `{ "job_id": "...", "status": "queued|started|finished|failed", "progress": {...}, "created_at": "...", "elapsed_seconds": N }` + +#### Scenario: Job not found +- **WHEN** a client queries job status with an unknown or expired job_id +- **THEN** the endpoint SHALL return HTTP 404 with `{ "error": "...", "code": "JOB_NOT_FOUND" }` + +### Requirement: Trace API SHALL expose job result endpoint +`GET /api/trace/job/{job_id}/result` SHALL return the result of a completed async trace job. + +#### Scenario: Completed job result +- **WHEN** a client requests result for a completed job +- **THEN** the endpoint SHALL return the same response format as the synchronous events endpoint +- **THEN** optional query params `domain`, `offset`, `limit` SHALL support pagination + +#### Scenario: Job not yet completed +- **WHEN** a client requests result for a non-completed job +- **THEN** the endpoint SHALL return HTTP 409 with `{ "error": "...", "code": "JOB_NOT_COMPLETE", "status": "queued|started" }` + +### Requirement: Async trace jobs SHALL have TTL and timeout +Job results SHALL expire after a configurable TTL, and execution SHALL be bounded by a timeout. + +#### Scenario: Job result TTL +- **WHEN** a trace job completes (success or failure) +- **THEN** the result SHALL be stored in Redis with TTL = `TRACE_JOB_TTL_SECONDS` (env, default: 3600) + +#### Scenario: Job execution timeout +- **WHEN** a trace job exceeds `TRACE_JOB_TIMEOUT_SECONDS` (env, default: 1800) +- **THEN** RQ SHALL terminate the job and mark it as failed diff --git a/openspec/changes/archive/trace-async-job-queue/tasks.md b/openspec/changes/archive/trace-async-job-queue/tasks.md new file mode 100644 index 0000000..40122a2 --- /dev/null +++ b/openspec/changes/archive/trace-async-job-queue/tasks.md @@ -0,0 +1,39 @@ +## 1. Dependencies + +- [x] 1.1 Add `rq>=1.16.0,<2.0.0` to `requirements.txt` +- [x] 1.2 Add `rq>=1.16.0,<2.0.0` to pip dependencies in `environment.yml` + +## 2. Trace Job Service + +- [x] 2.1 Create `src/mes_dashboard/services/trace_job_service.py` with `enqueue_trace_events_job()`, `get_job_status()`, `get_job_result()` +- [x] 2.2 Implement `execute_trace_events_job()` function (RQ worker entry point): runs EventFetcher + optional MSD aggregation, stores result in Redis with TTL +- [x] 2.3 Add job metadata tracking: `trace:job:{job_id}:meta` Redis key with `{profile, cid_count, domains, status, progress, created_at, completed_at}` +- [x] 2.4 Add unit tests for trace_job_service (13 tests: enqueue, status, result, worker execution, flatten) + +## 3. Async API Endpoints + +- [x] 3.1 Modify `events()` in `trace_routes.py`: when `len(container_ids) > TRACE_ASYNC_CID_THRESHOLD` and async available, call `enqueue_trace_events_job()` and return HTTP 202 +- [x] 3.2 Add `GET /api/trace/job/` endpoint: return job status from `get_job_status()` +- [x] 3.3 Add `GET /api/trace/job//result` endpoint: return job result from `get_job_result()` with optional `domain`, `offset`, `limit` query params +- [x] 3.4 Add rate limiting to job status/result endpoints (60 req/60s) +- [x] 3.5 Add unit tests for async endpoints (8 tests: async routing, sync fallback, 413 fallback, job status/result) + +## 4. Deployment + +- [x] 4.1 Create `deploy/mes-dashboard-trace-worker.service` systemd unit (MemoryHigh=3G, MemoryMax=4G) +- [x] 4.2 Update `scripts/start_server.sh`: add `start_rq_worker`/`stop_rq_worker`/`rq_worker_status` functions +- [x] 4.3 Update `scripts/deploy.sh`: add trace worker systemd install instructions +- [x] 4.4 Update `.env.example`: uncomment and add `TRACE_WORKER_ENABLED`, `TRACE_ASYNC_CID_THRESHOLD`, `TRACE_JOB_TTL_SECONDS`, `TRACE_JOB_TIMEOUT_SECONDS`, `TRACE_WORKER_COUNT`, `TRACE_WORKER_QUEUE` + +## 5. Frontend Integration + +- [x] 5.1 Modify `useTraceProgress.js`: detect async response (`eventsPayload.async === true`), switch to job polling mode +- [x] 5.2 Add `pollJobUntilComplete()` helper: poll `GET /api/trace/job/{job_id}` every 3s, max 30 minutes +- [x] 5.3 Add `job_progress` reactive state for UI: `{ active, job_id, status, elapsed_seconds, progress }` +- [x] 5.4 Add error handling: job failed (`JOB_FAILED`), polling timeout (`JOB_POLL_TIMEOUT`), abort support + +## 6. Verification + +- [x] 6.1 Run `python -m pytest tests/ -v` — 1090 passed, 152 skipped +- [x] 6.2 Run `cd frontend && npm run build` — frontend builds successfully +- [x] 6.3 Verify rq installed: `python -c "import rq; print(rq.VERSION)"` → 1.16.2 diff --git a/openspec/changes/archive/trace-events-memory-triage/design.md b/openspec/changes/archive/trace-events-memory-triage/design.md new file mode 100644 index 0000000..70c9d43 --- /dev/null +++ b/openspec/changes/archive/trace-events-memory-triage/design.md @@ -0,0 +1,174 @@ +## Context + +2026-02-25 生產環境 OOM crash 時間線: + +``` +13:18:15 seed-resolve (read_sql_df_slow): 525K rows → 70K lots (38.95s) +13:20:12 lineage (read_sql_df_slow): 114K CIDs, 54MB JSON (65s) +13:20:16 events (read_sql_df_slow): 2 domains × 115 batches × 2 workers +13:20:16 cursor.fetchall() 開始累積 rows → DataFrame → dict → grouped + 每個 domain 同時持有 ~3 份完整資料副本 + 峰值記憶體: (fetchall rows + DataFrame + grouped dict) × 2 domains ≈ 4-6 GB +13:37:47 OOM SIGKILL — 7GB VM, 0 swap +``` + +pool 隔離(前一個 change)解決了連線互搶問題,但 events 階段的記憶體使用才是 OOM 根因。 + +目前 `read_sql_df_slow` 使用 `cursor.fetchall()` 一次載入全部結果到 Python list, +然後建 `pd.DataFrame`,再 `iterrows()` + `to_dict()` 轉成 dict list。 +114K CIDs 的 upstream_history domain 可能回傳 100 萬+ rows, +每份副本數百 MB,3-4 份同時存在就超過 VM 記憶體。 + +## Goals / Non-Goals + +**Goals:** +- 防止大查詢直接 OOM 殺死整台 VM(admission control) +- 降低 events 階段峰值記憶體 60-70%(fetchmany + 跳過 DataFrame) +- 保護 host OS 穩定性(systemd MemoryMax + workers 降為 2) +- 不改變現有 API 回傳格式(對前端透明) +- 更新部署文件和 env 設定 + +**Non-Goals:** +- 不引入非同步任務佇列(提案 2 範圍) +- 不修改 lineage 階段(54MB 在可接受範圍) +- 不修改前端(提案 3 範圍) +- 不限制使用者查詢範圍(日期/站別由使用者決定) + +## Decisions + +### D1: Admission Control 閾值與行為 + +**決策**:在 trace events endpoint 加入 CID 數量上限判斷,**依 profile 區分**。 + +| Profile | CID 數量 | 行為 | +|---------|-----------|------| +| `query_tool` / `query_tool_reverse` | ≤ 50,000 | 正常同步處理 | +| `query_tool` / `query_tool_reverse` | > 50,000 | 回 HTTP 413(實務上不會發生) | +| `mid_section_defect` | 任意 | **不設硬限**,正常處理 + log warning(CID > 50K 時) | + +**env var**:`TRACE_EVENTS_CID_LIMIT`(預設 50000,僅對非 MSD profile 生效) + +**MSD 不設硬限的理由**: +- MSD 報廢追溯是聚合統計(pareto/表格),不渲染追溯圖,CID 數量多寡不影響可讀性 +- 漏掉 CID 會導致報廢數量統計失準,資料完整性至關重要 +- 114K CIDs 是真實業務場景(TMTT 站 5 個月),不能拒絕 +- OOM 風險由 systemd `MemoryMax=6G` 保護 host OS(service 被殺但 VM 存活,自動重啟) +- 提案 2 實作後,MSD 大查詢自動走 async job,記憶體問題根本解決 + +**query_tool 設 50K 上限的理由**: +- 追溯圖超過數千節點已無法閱讀,50K 是極寬鬆的安全閥 +- 實務上 query_tool seed 通常 1-50 lots → lineage 後幾百到幾千 CIDs + +**替代方案**: +- 全 profile 統一上限 → MSD 被擋住,報廢統計不完整 → rejected +- 無上限 + 只靠 fetchmany → MSD 接受此風險(有 MemoryMax 保護)→ adopted for MSD +- 上限設太低(如 10K)→ 影響正常 MSD 查詢(通常 5K-30K CIDs)→ rejected + +### D2: fetchmany 取代 fetchall + +**決策**:`read_sql_df_slow` 新增 `fetchmany` 模式,不建 DataFrame,直接回傳 iterator。 + +```python +def read_sql_df_slow_iter(sql, params=None, timeout_seconds=None, batch_size=5000): + """Yield batches of (columns, rows) without building DataFrame.""" + # ... connect, execute ... + columns = [desc[0].upper() for desc in cursor.description] + while True: + rows = cursor.fetchmany(batch_size) + if not rows: + break + yield columns, rows + # ... cleanup in finally ... +``` + +**env var**:`DB_SLOW_FETCHMANY_SIZE`(預設 5000) + +**理由**: +- `fetchall()` 強制全量 materialization +- `fetchmany(5000)` 每次只持有 5000 rows 在記憶體 +- 不建 DataFrame 省去 pandas overhead(index、dtype inference、NaN handling) +- EventFetcher 可以 yield 完一批就 group 到結果 dict,釋放 batch + +**trade-off**: +- `read_sql_df_slow`(回傳 DataFrame)保留不動,新增 `read_sql_df_slow_iter` +- 只有 EventFetcher 使用 iter 版本;其他 service 繼續用 DataFrame 版本 +- 這樣不影響任何既有 consumer + +### D3: EventFetcher 逐批 group 策略 + +**決策**:`_fetch_batch` 改用 `read_sql_df_slow_iter`,每 fetchmany batch 立刻 group 到 `grouped` dict。 + +```python +def _fetch_batch(batch_ids): + builder = QueryBuilder() + builder.add_in_condition(filter_column, batch_ids) + sql = EventFetcher._build_domain_sql(domain, builder.get_conditions_sql()) + + for columns, rows in read_sql_df_slow_iter(sql, builder.params, timeout_seconds=60): + for row in rows: + record = dict(zip(columns, row)) + # sanitize NaN + cid = record.get("CONTAINERID") + if cid: + grouped[cid].append(record) + # rows 離開 scope 即被 GC +``` + +**記憶體改善估算**: + +| 項目 | 修改前 | 修改後 | +|------|--------|--------| +| cursor buffer | 全量 (100K+ rows) | 5000 rows | +| DataFrame | 全量 | 無 | +| grouped dict | 全量(最終結果) | 全量(最終結果) | +| **峰值** | ~3x 全量 | ~1.05x 全量 | + +grouped dict 仍然是全量,但省去了 fetchall list + DataFrame 的兩份副本。 +對於 50K CIDs × 10 events = 500K records,從 ~1.5GB 降到 ~500MB。 + +### D4: trace_routes 避免雙份持有 + +**決策**:events endpoint 中 `raw_domain_results` 直接複用為 `results` 的來源, +`_flatten_domain_records` 在建完 flat list 後立刻 `del events_by_cid`。 + +目前的問題: +```python +raw_domain_results[domain] = events_by_cid # 持有 reference +rows = _flatten_domain_records(events_by_cid) # 建新 list +results[domain] = {"data": rows, "count": len(rows)} +# → events_by_cid 和 rows 同時存在 +``` + +修改後: +```python +events_by_cid = future.result() +rows = _flatten_domain_records(events_by_cid) +results[domain] = {"data": rows, "count": len(rows)} +if is_msd: + raw_domain_results[domain] = events_by_cid # MSD 需要 group-by-CID 結構 +else: + del events_by_cid # 非 MSD 立刻釋放 +``` + +### D5: Gunicorn workers 降為 2 + systemd MemoryMax + +**決策**: +- `.env.example` 中 `GUNICORN_WORKERS` 預設改為 2 +- `deploy/mes-dashboard.service` 加入 `MemoryHigh=5G` 和 `MemoryMax=6G` + +**理由**: +- 4 workers × 大查詢 = 記憶體競爭嚴重 +- 2 workers × 4 threads = 8 request threads,足夠處理並行請求 +- `MemoryHigh=5G`:超過後 kernel 開始 reclaim,但不殺進程 +- `MemoryMax=6G`:硬限,超過直接 OOM kill service(保護 host OS) +- 保留 1GB 給 OS + Redis + 其他服務 + +## Risks / Trade-offs + +| 風險 | 緩解措施 | +|------|---------| +| 50K CID 上限可能擋住合理查詢 | env var 可調;提案 2 實作後改走 async | +| fetchmany iterator 模式下 cursor 持有時間更長 | timeout_seconds=60 限制;semaphore 限制並行 | +| grouped dict 最終仍全量 | 這是 API contract(需回傳所有結果);提案 3 的 streaming 才能根本解決 | +| workers=2 降低並行處理能力 | 歷史頁查詢是 semaphore 限制的,降 workers 主要影響即時頁 throughput(但即時頁很輕量) | +| MemoryMax kill service 會中斷所有在線使用者 | systemd Restart=always 自動重啟;比 host OS crash 好得多 | diff --git a/openspec/changes/archive/trace-events-memory-triage/proposal.md b/openspec/changes/archive/trace-events-memory-triage/proposal.md new file mode 100644 index 0000000..6365d48 --- /dev/null +++ b/openspec/changes/archive/trace-events-memory-triage/proposal.md @@ -0,0 +1,39 @@ +## Why + +2026-02-25 生產環境 trace pipeline 處理 114K CIDs(TMTT 站 + 5 個月日期範圍)時, +worker 被 OOM SIGKILL(7GB VM,無 swap)。pool 隔離已完成,連線不再互搶, +但 events 階段的記憶體使用是真正瓶頸: + +1. `cursor.fetchall()` 一次載入全部 rows(數十萬筆) +2. `pd.DataFrame(rows)` 複製一份 +3. `df.iterrows()` + `row.to_dict()` 再一份 +4. `grouped[cid].append(record)` 累積到最終 dict +5. `raw_domain_results[domain]` + `results[domain]["data"]` 在 trace_routes 同時持有雙份 + +114K CIDs × 2 domains,峰值同時存在 3-4 份完整資料副本,每份數百 MB → 2-4 GB 單一 domain。 +7GB VM(4 workers)完全無法承受。 + +## What Changes + +- **Admission control**:trace events endpoint 加 CID 數量上限判斷,超過閾值回 HTTP 413 +- **分批處理**:`read_sql_df_slow` 改用 `cursor.fetchmany()` 取代 `fetchall()`,不建 DataFrame +- **EventFetcher 逐批 group**:每批 fetch 完立刻 group 到結果 dict,釋放 batch 記憶體 +- **trace_routes 避免雙份持有**:`raw_domain_results` 與 `results` 合併為單一資料結構 +- **Gunicorn workers 降為 2**:降低單機記憶體競爭 +- **systemd MemoryMax**:加 cgroup 記憶體保護,避免 OOM 殺死整台 VM +- **更新 .env.example**:新增 `TRACE_EVENTS_CID_LIMIT`、`DB_SLOW_FETCHMANY_SIZE` 等 env 文件 +- **更新 deploy/mes-dashboard.service**:加入 `MemoryHigh` 和 `MemoryMax` + +## Capabilities + +### Modified Capabilities + +- `trace-staged-api`: events endpoint 加入 admission control(CID 上限) +- `event-fetcher-unified`: 分批 group 記憶體優化,取消 DataFrame 中間層 + +## Impact + +- **後端核心**:database.py(fetchmany)、event_fetcher.py(逐批 group)、trace_routes.py(admission control + 記憶體管理) +- **部署設定**:gunicorn.conf.py、.env.example、deploy/mes-dashboard.service +- **不影響**:前端、即時監控頁、其他 service(reject_history、hold_history 等) +- **前置條件**:trace-pipeline-pool-isolation(已完成) diff --git a/openspec/changes/archive/trace-events-memory-triage/specs/event-fetcher-unified/spec.md b/openspec/changes/archive/trace-events-memory-triage/specs/event-fetcher-unified/spec.md new file mode 100644 index 0000000..83457ab --- /dev/null +++ b/openspec/changes/archive/trace-events-memory-triage/specs/event-fetcher-unified/spec.md @@ -0,0 +1,15 @@ +## MODIFIED Requirements + +### Requirement: EventFetcher SHALL use streaming fetch for batch queries +`EventFetcher._fetch_batch` SHALL use `read_sql_df_slow_iter` (fetchmany-based iterator) instead of `read_sql_df` (fetchall + DataFrame) to reduce peak memory usage. + +#### Scenario: Batch query memory optimization +- **WHEN** EventFetcher executes a batch query for a domain +- **THEN** the query SHALL use `cursor.fetchmany(batch_size)` (env: `DB_SLOW_FETCHMANY_SIZE`, default: 5000) instead of `cursor.fetchall()` +- **THEN** rows SHALL be converted directly to dicts via `dict(zip(columns, row))` without building a DataFrame +- **THEN** each fetchmany batch SHALL be grouped into the result dict immediately, allowing the batch rows to be garbage collected + +#### Scenario: Existing API contract preserved +- **WHEN** EventFetcher.fetch_events() returns results +- **THEN** the return type SHALL remain `Dict[str, List[Dict[str, Any]]]` (grouped by CONTAINERID) +- **THEN** the result SHALL be identical to the previous DataFrame-based implementation diff --git a/openspec/changes/archive/trace-events-memory-triage/specs/trace-staged-api/spec.md b/openspec/changes/archive/trace-events-memory-triage/specs/trace-staged-api/spec.md new file mode 100644 index 0000000..528fbdf --- /dev/null +++ b/openspec/changes/archive/trace-events-memory-triage/specs/trace-staged-api/spec.md @@ -0,0 +1,19 @@ +## MODIFIED Requirements + +### Requirement: Trace events endpoint SHALL manage memory for large queries +The events endpoint SHALL proactively release memory after processing large CID sets. + +#### Scenario: Admission control for non-MSD profiles +- **WHEN** the events endpoint receives a non-MSD profile request with `container_ids` count exceeding `TRACE_EVENTS_CID_LIMIT` (env: `TRACE_EVENTS_CID_LIMIT`, default: 50000) +- **THEN** the endpoint SHALL return HTTP 413 with `{ "error": "...", "code": "CID_LIMIT_EXCEEDED", "cid_count": N, "limit": M }` +- **THEN** Oracle DB connection pool SHALL NOT be consumed + +#### Scenario: MSD profile bypasses CID hard limit +- **WHEN** the events endpoint receives a `mid_section_defect` profile request regardless of CID count +- **THEN** the endpoint SHALL proceed with normal processing (no CID hard limit) +- **THEN** if CID count exceeds 50000, the endpoint SHALL log a warning with `cid_count` for monitoring + +#### Scenario: Non-MSD profile avoids double memory retention +- **WHEN** a non-MSD events request completes domain fetching +- **THEN** the `events_by_cid` reference SHALL be deleted immediately after `_flatten_domain_records` +- **THEN** only the flattened `results` dict SHALL remain in memory diff --git a/openspec/changes/archive/trace-events-memory-triage/tasks.md b/openspec/changes/archive/trace-events-memory-triage/tasks.md new file mode 100644 index 0000000..1b0a957 --- /dev/null +++ b/openspec/changes/archive/trace-events-memory-triage/tasks.md @@ -0,0 +1,37 @@ +## 1. Admission Control (profile-aware) + +- [x] 1.1 Add `TRACE_EVENTS_CID_LIMIT` env var (default 50000) to `trace_routes.py` +- [x] 1.2 Add CID count check in `events()` endpoint: for non-MSD profiles, if `len(container_ids) > TRACE_EVENTS_CID_LIMIT`, return HTTP 413 with `{ "code": "CID_LIMIT_EXCEEDED", "cid_count": N, "limit": M }` +- [x] 1.3 For MSD profile: bypass CID hard limit, log warning when CID count > 50000 +- [x] 1.4 Add unit tests: non-MSD CID > limit → 413; MSD CID > limit → proceeds normally + +## 2. Batch Fetch (fetchmany) in database.py + +- [x] 2.1 Add `read_sql_df_slow_iter(sql, params, timeout_seconds, batch_size)` generator function to `database.py` that yields `(columns, rows)` tuples using `cursor.fetchmany(batch_size)` +- [x] 2.2 Add `DB_SLOW_FETCHMANY_SIZE` to `get_db_runtime_config()` (default 5000) +- [x] 2.3 Add unit test for `read_sql_df_slow_iter` (mock cursor, verify fetchmany calls and yields) + +## 3. EventFetcher Memory Optimization + +- [x] 3.1 Modify `_fetch_batch` in `event_fetcher.py` to use `read_sql_df_slow_iter` instead of `read_sql_df` — iterate rows directly, skip DataFrame, group to `grouped` dict immediately +- [x] 3.2 Update `_sanitize_record` to work with `dict(zip(columns, row))` instead of `row.to_dict()` +- [x] 3.3 Add unit test verifying EventFetcher uses `read_sql_df_slow_iter` import +- [x] 3.4 Update existing EventFetcher tests (mock `read_sql_df_slow_iter` instead of `read_sql_df`) + +## 4. trace_routes Memory Optimization + +- [x] 4.1 Modify events endpoint: only keep `raw_domain_results[domain]` for MSD profile; for non-MSD, `del events_by_cid` after flattening +- [x] 4.2 Verify existing `del raw_domain_results` and `gc.collect()` logic still correct after refactor + +## 5. Deployment Configuration + +- [x] 5.1 Update `.env.example`: add `TRACE_EVENTS_CID_LIMIT`, `DB_SLOW_FETCHMANY_SIZE` with descriptions +- [x] 5.2 Update `.env.example`: change `GUNICORN_WORKERS` default comment to recommend 2 for ≤ 8GB RAM +- [x] 5.3 Update `.env.example`: change `TRACE_EVENTS_MAX_WORKERS` and `EVENT_FETCHER_MAX_WORKERS` default to 2 +- [x] 5.4 Update `deploy/mes-dashboard.service`: add `MemoryHigh=5G` and `MemoryMax=6G` +- [x] 5.5 Update `deploy/mes-dashboard.service`: add comment explaining memory limits + +## 6. Verification + +- [x] 6.1 Run `python -m pytest tests/ -v` — all existing tests pass (1069 passed, 152 skipped) +- [x] 6.2 Verify `.env.example` env var documentation is consistent with code defaults diff --git a/openspec/changes/archive/trace-streaming-response/.openspec.yaml b/openspec/changes/archive/trace-streaming-response/.openspec.yaml new file mode 100644 index 0000000..e331c97 --- /dev/null +++ b/openspec/changes/archive/trace-streaming-response/.openspec.yaml @@ -0,0 +1,2 @@ +schema: spec-driven +created: 2026-02-25 diff --git a/openspec/changes/archive/trace-streaming-response/design.md b/openspec/changes/archive/trace-streaming-response/design.md new file mode 100644 index 0000000..6e1827d --- /dev/null +++ b/openspec/changes/archive/trace-streaming-response/design.md @@ -0,0 +1,140 @@ +## Context + +提案 2(trace-async-job-queue)讓大查詢在獨立 worker 中執行, +但結果仍然全量 materialize 到 Redis(job result)和前端記憶體。 + +114K CIDs × 2 domains 的結果 JSON 可達 200-500MB: +- Worker 記憶體:grouped dict ~500MB + JSON serialize ~500MB = ~1GB 峰值 +- Redis:SETEX 500MB 的 key 耗時 5-10s,阻塞其他操作 +- 前端:瀏覽器解析 500MB JSON freeze UI 數十秒 + +串流回傳讓 server 逐批產生、前端逐批消費,記憶體使用只與每批大小成正比。 + +## Goals / Non-Goals + +**Goals:** +- Job 結果以 NDJSON 串流回傳,避免全量 materialize +- EventFetcher 支援 iterator 模式,逐批 yield 結果 +- 前端用 ReadableStream 逐行解析,逐批渲染 +- 結果也支援分頁 API(給不支援串流的 consumer 使用) + +**Non-Goals:** +- 不改動同步路徑(CID < 閾值仍走現有 JSON 回傳) +- 不做 WebSocket(NDJSON over HTTP 更簡單、更通用) +- 不做 Server-Sent Events(SSE 只支援 text/event-stream,不適合大 payload) +- 不修改 MSD aggregation(aggregation 需要全量資料,但結果較小) + +## Decisions + +### D1: NDJSON 格式 + +**決策**:使用 Newline Delimited JSON(NDJSON)作為串流格式。 + +``` +Content-Type: application/x-ndjson + +{"type":"meta","job_id":"abc123","domains":["history","materials"],"cid_count":114892} +{"type":"domain_start","domain":"history","batch":1,"total_batches":23} +{"type":"records","domain":"history","batch":1,"data":[...5000 records...]} +{"type":"records","domain":"history","batch":2,"data":[...5000 records...]} +... +{"type":"domain_end","domain":"history","total_records":115000} +{"type":"domain_start","domain":"materials","batch":1,"total_batches":12} +... +{"type":"aggregation","data":{...}} +{"type":"complete","elapsed_seconds":285} +``` + +**env var**:`TRACE_STREAM_BATCH_SIZE`(預設 5000 records/batch) + +**理由**: +- NDJSON 是業界標準串流 JSON 格式(Elasticsearch、BigQuery、GitHub API 都用) +- 每行是獨立 JSON,前端可逐行 parse(不需要等整個 response) +- 5000 records/batch ≈ 2-5MB,瀏覽器可即時渲染 +- 與 HTTP/1.1 chunked transfer 完美搭配 + +### D2: EventFetcher iterator 模式 + +**決策**:新增 `fetch_events_iter()` 方法,yield 每批 grouped records。 + +```python +@staticmethod +def fetch_events_iter(container_ids, domain, batch_size=5000): + """Yield dicts of {cid: [records]} in batches.""" + # ... same SQL building logic ... + for oracle_batch_ids in batches: + for columns, rows in read_sql_df_slow_iter(sql, params): + batch_grouped = defaultdict(list) + for row in rows: + record = dict(zip(columns, row)) + cid = record.get("CONTAINERID") + if cid: + batch_grouped[cid].append(record) + yield dict(batch_grouped) +``` + +**理由**: +- 與 `fetch_events()` 共存,不影響同步路徑 +- 每次 yield 只持有一個 fetchmany batch 的 grouped 結果 +- Worker 收到 yield 後立刻序列化寫出,不累積 + +### D3: 結果分頁 API + +**決策**:提供 REST 分頁 API 作為 NDJSON 的替代方案。 + +``` +GET /api/trace/job/{job_id}/result?domain=history&offset=0&limit=5000 +``` + +**回應格式**: +```json +{ + "domain": "history", + "offset": 0, + "limit": 5000, + "total": 115000, + "data": [... 5000 records ...] +} +``` + +**理由**: +- 某些 consumer(如外部系統)不支援 NDJSON 串流 +- 分頁 API 是標準 REST pattern +- 結果仍儲存在 Redis(但按 domain 分 key),每個 key 5000 records ≈ 5MB + +### D4: 前端 ReadableStream 消費 + +```javascript +async function consumeNDJSON(url, onChunk) { + const response = await fetch(url) + const reader = response.body.getReader() + const decoder = new TextDecoder() + let buffer = '' + + while (true) { + const { done, value } = await reader.read() + if (done) break + buffer += decoder.decode(value, { stream: true }) + const lines = buffer.split('\n') + buffer = lines.pop() // 保留不完整的最後一行 + for (const line of lines) { + if (line.trim()) onChunk(JSON.parse(line)) + } + } +} +``` + +**理由**: +- ReadableStream 是瀏覽器原生 API,無需額外依賴 +- 逐行 parse 記憶體使用恆定(只與 batch_size 成正比) +- 可邊收邊渲染,使用者體驗好 + +## Risks / Trade-offs + +| 風險 | 緩解措施 | +|------|---------| +| NDJSON 不支援 HTTP 壓縮 | Flask 可配 gzip middleware;每行 5000 records 壓縮率高 | +| 中途斷線需重新開始 | 分頁 API 可從斷點繼續取;NDJSON 用於一次性消費 | +| 前端需要處理部分結果渲染 | 表格元件改用 virtual scroll(既有 vue-virtual-scroller) | +| MSD aggregation 仍需全量資料 | aggregation 在 worker 內部完成,只串流最終結果(較小) | +| 結果按 domain 分 key 增加 Redis key 數量 | TTL 清理 + key prefix 隔離 | diff --git a/openspec/changes/archive/trace-streaming-response/proposal.md b/openspec/changes/archive/trace-streaming-response/proposal.md new file mode 100644 index 0000000..a651c3f --- /dev/null +++ b/openspec/changes/archive/trace-streaming-response/proposal.md @@ -0,0 +1,39 @@ +## Why + +即使有非同步 job(提案 2)處理大查詢,結果 materialize 仍然是記憶體瓶頸: + +1. **job result 全量 JSON**:114K CIDs × 2 domains 的結果 JSON 可達數百 MB, + Redis 儲存 + 讀取 + Flask jsonify 序列化,峰值記憶體仍高 +2. **前端一次性解析**:瀏覽器解析數百 MB JSON 會 freeze UI +3. **Redis 單 key 限制**:大 value 影響 Redis 效能(阻塞其他操作) + +串流回傳(NDJSON/分頁)讓 server 逐批產生資料、前端逐批消費, +記憶體使用與 CID 總數解耦,只與每批大小成正比。 + +## What Changes + +- **EventFetcher 支援 iterator 模式**:`fetch_events_iter()` yield 每批結果而非累積全部 +- **新增 `GET /api/trace/job/{job_id}/stream`**:NDJSON 串流回傳 job 結果 +- **前端 useTraceProgress 串流消費**:用 `fetch()` + `ReadableStream` 逐行解析 NDJSON +- **結果分頁 API**:`GET /api/trace/job/{job_id}/result?domain=history&offset=0&limit=5000` +- **更新 .env.example**:`TRACE_STREAM_BATCH_SIZE` + +## Capabilities + +### New Capabilities + +- `trace-streaming-response`: NDJSON 串流回傳 + 結果分頁 + +### Modified Capabilities + +- `event-fetcher-unified`: 新增 iterator 模式(`fetch_events_iter`) +- `trace-staged-api`: job result 串流 endpoint +- `progressive-trace-ux`: 前端串流消費 + 逐批渲染 + +## Impact + +- **後端核心**:event_fetcher.py(iterator 模式)、trace_routes.py(stream endpoint) +- **前端修改**:useTraceProgress.js(ReadableStream 消費) +- **部署設定**:.env.example(`TRACE_STREAM_BATCH_SIZE`) +- **不影響**:同步路徑(CID < 閾值仍走現有流程)、其他 service、即時監控頁 +- **前置條件**:trace-async-job-queue(提案 2) diff --git a/openspec/changes/archive/trace-streaming-response/specs/event-fetcher-unified/spec.md b/openspec/changes/archive/trace-streaming-response/specs/event-fetcher-unified/spec.md new file mode 100644 index 0000000..f1b6699 --- /dev/null +++ b/openspec/changes/archive/trace-streaming-response/specs/event-fetcher-unified/spec.md @@ -0,0 +1,14 @@ +## ADDED Requirements + +### Requirement: EventFetcher SHALL support iterator mode for streaming +`EventFetcher.fetch_events_iter()` SHALL yield batched results for streaming consumption. + +#### Scenario: Iterator mode yields batches +- **WHEN** `fetch_events_iter(container_ids, domain, batch_size)` is called +- **THEN** it SHALL yield `Dict[str, List[Dict]]` batches (grouped by CONTAINERID) +- **THEN** each yielded batch SHALL contain results from one `cursor.fetchmany()` call +- **THEN** memory usage SHALL be proportional to `batch_size`, not total result count + +#### Scenario: Iterator mode cache behavior +- **WHEN** `fetch_events_iter` is used for large CID sets (> CACHE_SKIP_CID_THRESHOLD) +- **THEN** per-domain cache SHALL be skipped (consistent with `fetch_events` behavior) diff --git a/openspec/changes/archive/trace-streaming-response/specs/trace-staged-api/spec.md b/openspec/changes/archive/trace-streaming-response/specs/trace-staged-api/spec.md new file mode 100644 index 0000000..118b157 --- /dev/null +++ b/openspec/changes/archive/trace-streaming-response/specs/trace-staged-api/spec.md @@ -0,0 +1,22 @@ +## ADDED Requirements + +### Requirement: Trace API SHALL expose NDJSON stream endpoint for job results +`GET /api/trace/job/{job_id}/stream` SHALL return job results as NDJSON (Newline Delimited JSON) stream. + +#### Scenario: Stream completed job result +- **WHEN** a client requests stream for a completed job +- **THEN** the endpoint SHALL return `Content-Type: application/x-ndjson` +- **THEN** the response SHALL contain ordered NDJSON lines: `meta` → `domain_start` → `records` batches → `domain_end` → `aggregation` (if applicable) → `complete` +- **THEN** each `records` line SHALL contain at most `TRACE_STREAM_BATCH_SIZE` (env, default: 5000) records + +#### Scenario: Stream for non-completed job +- **WHEN** a client requests stream for a non-completed job +- **THEN** the endpoint SHALL return HTTP 409 with `{ "error": "...", "code": "JOB_NOT_COMPLETE" }` + +### Requirement: Job result pagination SHALL support domain-level offset/limit +`GET /api/trace/job/{job_id}/result` SHALL support fine-grained pagination per domain. + +#### Scenario: Paginated domain result +- **WHEN** a client requests `?domain=history&offset=0&limit=5000` +- **THEN** the endpoint SHALL return only the specified slice of records for that domain +- **THEN** the response SHALL include `total` count for the domain diff --git a/openspec/changes/archive/trace-streaming-response/tasks.md b/openspec/changes/archive/trace-streaming-response/tasks.md new file mode 100644 index 0000000..b1dc4e4 --- /dev/null +++ b/openspec/changes/archive/trace-streaming-response/tasks.md @@ -0,0 +1,35 @@ +## 1. EventFetcher Iterator Mode + +- [ ] 1.1 Add `fetch_events_iter(container_ids, domain, batch_size)` static method to `EventFetcher` class: yields `Dict[str, List[Dict]]` batches using `read_sql_df_slow_iter` +- [ ] 1.2 Add unit tests for `fetch_events_iter` (mock read_sql_df_slow_iter, verify batch yields) + +## 2. NDJSON Stream Endpoint + +- [x] 2.1 Add `GET /api/trace/job//stream` endpoint: returns `Content-Type: application/x-ndjson` with Flask `Response(generate(), mimetype='application/x-ndjson')` +- [x] 2.2 Implement NDJSON generator: yield `meta` → `domain_start` → `records` batches → `domain_end` → `aggregation` → `complete` lines +- [x] 2.3 Add `TRACE_STREAM_BATCH_SIZE` env var (default 5000) +- [x] 2.4 Modify `execute_trace_events_job()` to store results in chunked Redis keys: `trace:job:{job_id}:result:{domain}:{chunk_idx}` +- [x] 2.5 Add unit tests for NDJSON stream endpoint + +## 3. Result Pagination API + +- [x] 3.1 Enhance `GET /api/trace/job//result` with `domain`, `offset`, `limit` query params +- [x] 3.2 Implement pagination over chunked Redis keys +- [x] 3.3 Add unit tests for pagination (offset/limit boundary cases) + +## 4. Frontend Streaming Consumer + +- [x] 4.1 Add `consumeNDJSONStream(url, onChunk)` utility using `ReadableStream` +- [x] 4.2 Modify `useTraceProgress.js`: for async jobs, prefer stream endpoint over full result endpoint +- [x] 4.3 Add progressive rendering: update table data as each NDJSON batch arrives +- [x] 4.4 Add error handling: stream interruption, malformed NDJSON lines + +## 5. Deployment + +- [x] 5.1 Update `.env.example`: add `TRACE_STREAM_BATCH_SIZE` with description + +## 6. Verification + +- [x] 6.1 Run `python -m pytest tests/ -v` — all existing tests pass +- [x] 6.2 Run `cd frontend && npm run build` — frontend builds successfully +- [ ] 6.3 Manual test: verify NDJSON stream produces valid output for multi-domain query diff --git a/requirements.txt b/requirements.txt index d6071b3..5784a4e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -23,6 +23,9 @@ openpyxl>=3.0.0 # Excel file support redis>=5.0.0,<6.0.0 hiredis>=2.0.0,<4.0.0 # C parser for better Redis performance +# Task Queue (async trace jobs) +rq>=1.16.0,<2.0.0 + # HTTP Client requests>=2.28.0,<3.0.0 diff --git a/scripts/deploy.sh b/scripts/deploy.sh index e3179af..7aec6ee 100644 --- a/scripts/deploy.sh +++ b/scripts/deploy.sh @@ -242,9 +242,14 @@ show_next_steps() { echo " sudo chmod 640 .env" echo " sudo cp deploy/mes-dashboard.service /etc/systemd/system/" echo " sudo cp deploy/mes-dashboard-watchdog.service /etc/systemd/system/" + echo " sudo cp deploy/mes-dashboard-trace-worker.service /etc/systemd/system/" echo " sudo systemctl daemon-reload" echo " sudo systemctl enable --now mes-dashboard mes-dashboard-watchdog" echo "" + echo "Optional: enable async trace worker (for large queries)" + echo " # Set TRACE_WORKER_ENABLED=true in .env" + echo " sudo systemctl enable --now mes-dashboard-trace-worker" + echo "" echo "==========================================" } diff --git a/scripts/start_server.sh b/scripts/start_server.sh index 6c3d012..a4294d4 100755 --- a/scripts/start_server.sh +++ b/scripts/start_server.sh @@ -25,6 +25,9 @@ PORT=$(echo "$DEFAULT_PORT" | cut -d: -f2) REDIS_ENABLED="${REDIS_ENABLED:-true}" # Worker watchdog configuration WATCHDOG_ENABLED="${WATCHDOG_ENABLED:-true}" +# RQ trace worker configuration +TRACE_WORKER_ENABLED="${TRACE_WORKER_ENABLED:-false}" +TRACE_WORKER_QUEUE="${TRACE_WORKER_QUEUE:-trace-events}" # Colors for output RED='\033[0;31m' @@ -73,6 +76,8 @@ resolve_runtime_paths() { WATCHDOG_PID_FILE="${WATCHDOG_PID_FILE:-${WATCHDOG_RUNTIME_DIR}/gunicorn.pid}" WATCHDOG_STATE_FILE="${WATCHDOG_STATE_FILE:-${WATCHDOG_RUNTIME_DIR}/mes_dashboard_restart_state.json}" WATCHDOG_PROCESS_PID_FILE="${WATCHDOG_PROCESS_PID_FILE:-${WATCHDOG_RUNTIME_DIR}/worker_watchdog.pid}" + RQ_WORKER_PID_FILE="${WATCHDOG_RUNTIME_DIR}/rq_trace_worker.pid" + RQ_WORKER_LOG="${LOG_DIR}/rq_worker.log" PID_FILE="${WATCHDOG_PID_FILE}" export WATCHDOG_RUNTIME_DIR WATCHDOG_RESTART_FLAG WATCHDOG_PID_FILE WATCHDOG_STATE_FILE WATCHDOG_PROCESS_PID_FILE } @@ -585,6 +590,126 @@ stop_watchdog() { return 0 } +# ============================================================ +# RQ Trace Worker Management Functions +# ============================================================ +get_rq_worker_pid() { + if [ -f "$RQ_WORKER_PID_FILE" ]; then + local pid + pid=$(cat "$RQ_WORKER_PID_FILE" 2>/dev/null || true) + if [ -n "$pid" ] && kill -0 "$pid" 2>/dev/null; then + echo "$pid" + return 0 + fi + rm -f "$RQ_WORKER_PID_FILE" + fi + + local discovered_pid + discovered_pid=$(pgrep -f "[r]q worker.*${TRACE_WORKER_QUEUE}" 2>/dev/null | head -1 || true) + if [ -n "${discovered_pid}" ] && kill -0 "${discovered_pid}" 2>/dev/null; then + echo "${discovered_pid}" > "$RQ_WORKER_PID_FILE" + echo "${discovered_pid}" + return 0 + fi + + return 1 +} + +is_rq_worker_running() { + get_rq_worker_pid &>/dev/null +} + +start_rq_worker() { + if ! is_enabled "${TRACE_WORKER_ENABLED:-false}"; then + log_info "RQ trace worker is disabled (TRACE_WORKER_ENABLED=${TRACE_WORKER_ENABLED:-false})" + return 0 + fi + + if [ "$REDIS_ENABLED" != "true" ]; then + log_warn "Redis is disabled; cannot start RQ trace worker" + return 0 + fi + + if is_rq_worker_running; then + local pid + pid=$(get_rq_worker_pid) + log_success "RQ trace worker already running (PID: ${pid})" + return 0 + fi + + # Check if rq is installed + if ! python -c "import rq" 2>/dev/null; then + log_warn "rq package not installed; skip trace worker (pip install rq)" + return 0 + fi + + log_info "Starting RQ trace worker (queue: ${TRACE_WORKER_QUEUE})..." + local redis_url="${REDIS_URL:-redis://localhost:6379/0}" + if command -v setsid >/dev/null 2>&1; then + setsid rq worker "${TRACE_WORKER_QUEUE}" --url "${redis_url}" >> "$RQ_WORKER_LOG" 2>&1 < /dev/null & + else + nohup rq worker "${TRACE_WORKER_QUEUE}" --url "${redis_url}" >> "$RQ_WORKER_LOG" 2>&1 < /dev/null & + fi + local pid=$! + echo "$pid" > "$RQ_WORKER_PID_FILE" + + sleep 1 + if kill -0 "$pid" 2>/dev/null; then + log_success "RQ trace worker started (PID: ${pid})" + return 0 + fi + + rm -f "$RQ_WORKER_PID_FILE" + log_error "Failed to start RQ trace worker" + return 1 +} + +stop_rq_worker() { + if ! is_rq_worker_running; then + rm -f "$RQ_WORKER_PID_FILE" + return 0 + fi + + local pid + pid=$(get_rq_worker_pid) + log_info "Stopping RQ trace worker (PID: ${pid})..." + kill -TERM "$pid" 2>/dev/null || true + + local count=0 + while kill -0 "$pid" 2>/dev/null && [ $count -lt 10 ]; do + sleep 1 + count=$((count + 1)) + done + + if kill -0 "$pid" 2>/dev/null; then + kill -9 "$pid" 2>/dev/null || true + sleep 1 + fi + + rm -f "$RQ_WORKER_PID_FILE" + if kill -0 "$pid" 2>/dev/null; then + log_error "Failed to stop RQ trace worker" + return 1 + fi + + log_success "RQ trace worker stopped" + return 0 +} + +rq_worker_status() { + if ! is_enabled "${TRACE_WORKER_ENABLED:-false}"; then + echo -e " RQ Worker:${YELLOW} DISABLED${NC}" + return 0 + fi + + if is_rq_worker_running; then + local pid=$(get_rq_worker_pid) + echo -e " RQ Worker:${GREEN} RUNNING${NC} (PID: ${pid}, queue: ${TRACE_WORKER_QUEUE})" + else + echo -e " RQ Worker:${RED} STOPPED${NC}" + fi +} + do_start() { local foreground=false @@ -662,6 +787,7 @@ do_start() { log_info "Access URL: http://localhost:${PORT}" log_info "Logs: ${LOG_DIR}/" start_watchdog || return 1 + start_rq_worker echo "[$(timestamp)] Server started (PID: ${pid})" >> "$STARTUP_LOG" else log_error "Failed to start server" @@ -723,6 +849,7 @@ do_stop() { fi fi + stop_rq_worker stop_watchdog } @@ -768,6 +895,7 @@ do_status() { else echo -e " Watchdog:${YELLOW} DISABLED${NC}" fi + rq_worker_status if is_running; then echo "" diff --git a/src/mes_dashboard/core/database.py b/src/mes_dashboard/core/database.py index 68b25cf..cbe8dc0 100644 --- a/src/mes_dashboard/core/database.py +++ b/src/mes_dashboard/core/database.py @@ -201,6 +201,10 @@ def get_db_runtime_config(refresh: bool = False) -> Dict[str, Any]: "DB_SLOW_MAX_CONCURRENT", config_class.DB_SLOW_MAX_CONCURRENT, ), + "slow_fetchmany_size": _from_app_or_env_int( + "DB_SLOW_FETCHMANY_SIZE", + 5000, + ), } return _DB_RUNTIME_CONFIG.copy() @@ -696,6 +700,104 @@ def read_sql_df_slow( sem.release() +def read_sql_df_slow_iter( + sql: str, + params: Optional[Dict[str, Any]] = None, + timeout_seconds: Optional[int] = None, + batch_size: Optional[int] = None, +): + """Execute a slow SQL query and yield batches of (columns, rows) without DataFrame. + + Uses cursor.fetchmany() to avoid full materialization in memory. + Each yielded batch is a tuple of (columns: List[str], rows: List[tuple]). + + Args: + sql: SQL query string with Oracle bind variables. + params: Optional dict of parameter values. + timeout_seconds: Call timeout in seconds. None = use config default. + batch_size: Number of rows per fetchmany call. None = use + DB_SLOW_FETCHMANY_SIZE from config (default 5000). + """ + global _SLOW_QUERY_ACTIVE + + runtime = get_db_runtime_config() + if timeout_seconds is None: + timeout_ms = runtime["slow_call_timeout_ms"] + else: + timeout_ms = timeout_seconds * 1000 + + if batch_size is None: + batch_size = runtime["slow_fetchmany_size"] + + sem = _get_slow_query_semaphore() + acquired = sem.acquire(timeout=60) + if not acquired: + raise RuntimeError( + "Slow-query concurrency limit reached; try again later" + ) + + with _SLOW_QUERY_LOCK: + _SLOW_QUERY_ACTIVE += 1 + active = _SLOW_QUERY_ACTIVE + + logger.info("Slow query iter starting (active=%s, timeout_ms=%s, batch_size=%s)", active, timeout_ms, batch_size) + start_time = time.time() + conn = None + total_rows = 0 + try: + conn = oracledb.connect( + **DB_CONFIG, + tcp_connect_timeout=runtime["tcp_connect_timeout"], + retry_count=runtime["retry_count"], + retry_delay=runtime["retry_delay"], + ) + conn.call_timeout = timeout_ms + with _DIRECT_CONN_LOCK: + global _DIRECT_CONN_COUNTER + _DIRECT_CONN_COUNTER += 1 + logger.debug( + "Slow-query iter connection established (call_timeout_ms=%s)", timeout_ms + ) + + cursor = conn.cursor() + cursor.execute(sql, params or {}) + columns = [desc[0].upper() for desc in cursor.description] + + while True: + rows = cursor.fetchmany(batch_size) + if not rows: + break + total_rows += len(rows) + yield columns, rows + + cursor.close() + + elapsed = time.time() - start_time + if elapsed > 1.0: + sql_preview = sql.strip().replace('\n', ' ')[:100] + logger.warning(f"Slow query iter ({elapsed:.2f}s, rows={total_rows}): {sql_preview}...") + else: + logger.debug(f"Query iter completed in {elapsed:.3f}s, rows={total_rows}") + + except Exception as exc: + elapsed = time.time() - start_time + ora_code = _extract_ora_code(exc) + sql_preview = sql.strip().replace('\n', ' ')[:100] + logger.error( + f"Query iter failed after {elapsed:.2f}s - ORA-{ora_code}: {exc} | SQL: {sql_preview}..." + ) + raise + finally: + if conn: + try: + conn.close() + except Exception: + pass + with _SLOW_QUERY_LOCK: + _SLOW_QUERY_ACTIVE -= 1 + sem.release() + + # ============================================================ # Table Utilities # ============================================================ diff --git a/src/mes_dashboard/routes/trace_routes.py b/src/mes_dashboard/routes/trace_routes.py index f04243c..514bdf9 100644 --- a/src/mes_dashboard/routes/trace_routes.py +++ b/src/mes_dashboard/routes/trace_routes.py @@ -18,7 +18,7 @@ import time from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Any, Dict, List, Optional -from flask import Blueprint, jsonify, request +from flask import Blueprint, Response, jsonify, request from mes_dashboard.core.cache import cache_get, cache_set from mes_dashboard.core.rate_limit import configured_rate_limit @@ -31,6 +31,14 @@ from mes_dashboard.services.mid_section_defect_service import ( resolve_trace_seed_lots, ) from mes_dashboard.services.query_tool_service import resolve_lots +from mes_dashboard.services.trace_job_service import ( + TRACE_ASYNC_CID_THRESHOLD, + enqueue_trace_events_job, + get_job_result, + get_job_status, + is_async_available, + stream_job_result_ndjson, +) logger = logging.getLogger("mes_dashboard.trace_routes") @@ -38,6 +46,7 @@ trace_bp = Blueprint("trace", __name__, url_prefix="/api/trace") TRACE_SLOW_THRESHOLD_SECONDS = float(os.getenv('TRACE_SLOW_THRESHOLD_SECONDS', '15')) TRACE_EVENTS_MAX_WORKERS = int(os.getenv('TRACE_EVENTS_MAX_WORKERS', '2')) +TRACE_EVENTS_CID_LIMIT = int(os.getenv('TRACE_EVENTS_CID_LIMIT', '50000')) TRACE_CACHE_TTL_SECONDS = 300 PROFILE_QUERY_TOOL = "query_tool" @@ -87,6 +96,14 @@ _TRACE_EVENTS_RATE_LIMIT = configured_rate_limit( default_window_seconds=60, ) +_TRACE_JOB_RATE_LIMIT = configured_rate_limit( + bucket="trace-job-status", + max_attempts_env="TRACE_JOB_RATE_LIMIT_MAX_REQUESTS", + window_seconds_env="TRACE_JOB_RATE_LIMIT_WINDOW_SECONDS", + default_max_attempts=60, + default_window_seconds=60, +) + def _json_body() -> Optional[Dict[str, Any]]: payload = request.get_json(silent=True) @@ -622,10 +639,49 @@ def events(): 400, ) + # Admission control: non-MSD profiles have a hard CID limit to prevent OOM. + # MSD (reject tracing) needs all CIDs for accurate aggregation statistics. + is_msd = (profile == PROFILE_MID_SECTION_DEFECT) + cid_count = len(container_ids) + + # Route large queries to async job queue when available. + # Falls through to sync path if RQ is not installed or Redis is down. + if cid_count > TRACE_ASYNC_CID_THRESHOLD and is_async_available(): + job_id, err = enqueue_trace_events_job(profile, container_ids, domains, payload) + if job_id is not None: + logger.info( + "trace events routed to async job_id=%s profile=%s cid_count=%s", + job_id, profile, cid_count, + ) + return jsonify({ + "stage": "events", + "async": True, + "job_id": job_id, + "status_url": f"/api/trace/job/{job_id}", + "stream_url": f"/api/trace/job/{job_id}/stream", + }), 202 + logger.warning("trace async enqueue failed cid_count=%s: %s", cid_count, err) + + if not is_msd and cid_count > TRACE_EVENTS_CID_LIMIT: + logger.warning( + "trace events CID limit exceeded profile=%s cid_count=%s limit=%s", + profile, cid_count, TRACE_EVENTS_CID_LIMIT, + ) + return _error( + "CID_LIMIT_EXCEEDED", + f"container_ids count ({cid_count}) exceeds limit ({TRACE_EVENTS_CID_LIMIT})", + 413, + ) + + if is_msd and cid_count > TRACE_EVENTS_CID_LIMIT: + logger.warning( + "trace events MSD large query proceeding cid_count=%s limit=%s", + cid_count, TRACE_EVENTS_CID_LIMIT, + ) + # For MSD profile, skip the events-level cache so that aggregation is # always recomputed with the current loss_reasons. EventFetcher still # provides per-domain Redis caching, so raw Oracle queries are avoided. - is_msd = (profile == PROFILE_MID_SECTION_DEFECT) events_cache_key = _events_cache_key(profile, domains, container_ids) if not is_msd: @@ -655,9 +711,11 @@ def events(): domain = futures[future] try: events_by_cid = future.result() - raw_domain_results[domain] = events_by_cid rows = _flatten_domain_records(events_by_cid) results[domain] = {"data": rows, "count": len(rows)} + if is_msd: + raw_domain_results[domain] = events_by_cid + # Non-MSD: events_by_cid goes out of scope here, no double retention except Exception as exc: logger.error("events stage domain failed domain=%s: %s", domain, exc, exc_info=True) failed_domains.append(domain) @@ -694,3 +752,62 @@ def events(): gc.collect() return jsonify(response) + + +@trace_bp.route("/job/", methods=["GET"]) +@_TRACE_JOB_RATE_LIMIT +def job_status(job_id: str): + """Return the current status of an async trace job.""" + status = get_job_status(job_id) + if status is None: + return _error("JOB_NOT_FOUND", "job not found or expired", 404) + return jsonify(status) + + +@trace_bp.route("/job//result", methods=["GET"]) +@_TRACE_JOB_RATE_LIMIT +def job_result(job_id: str): + """Return the result of a completed async trace job.""" + status = get_job_status(job_id) + if status is None: + return _error("JOB_NOT_FOUND", "job not found or expired", 404) + + if status["status"] not in ("finished",): + return jsonify({ + "error": {"code": "JOB_NOT_COMPLETE", "message": "job has not completed yet"}, + "status": status["status"], + }), 409 + + domain = request.args.get("domain") + offset = request.args.get("offset", 0, type=int) + limit = request.args.get("limit", 0, type=int) + + result = get_job_result(job_id, domain=domain, offset=offset, limit=limit) + if result is None: + return _error("JOB_RESULT_EXPIRED", "job result has expired", 404) + + return jsonify(result) + + +@trace_bp.route("/job//stream", methods=["GET"]) +@_TRACE_JOB_RATE_LIMIT +def job_stream(job_id: str): + """Stream completed job result as NDJSON for progressive frontend consumption.""" + status = get_job_status(job_id) + if status is None: + return _error("JOB_NOT_FOUND", "job not found or expired", 404) + + if status["status"] != "finished": + return jsonify({ + "error": {"code": "JOB_NOT_COMPLETE", "message": "job has not completed yet"}, + "status": status["status"], + }), 409 + + return Response( + stream_job_result_ndjson(job_id), + mimetype="application/x-ndjson", + headers={ + "Cache-Control": "no-cache", + "X-Accel-Buffering": "no", + }, + ) diff --git a/src/mes_dashboard/services/event_fetcher.py b/src/mes_dashboard/services/event_fetcher.py index 032dc9f..55105cb 100644 --- a/src/mes_dashboard/services/event_fetcher.py +++ b/src/mes_dashboard/services/event_fetcher.py @@ -13,7 +13,7 @@ from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Any, Dict, List from mes_dashboard.core.cache import cache_get, cache_set -from mes_dashboard.core.database import read_sql_df_slow as read_sql_df +from mes_dashboard.core.database import read_sql_df_slow_iter from mes_dashboard.sql import QueryBuilder, SQLLoader logger = logging.getLogger("mes_dashboard.event_fetcher") @@ -238,41 +238,38 @@ class EventFetcher: filter_column = spec["filter_column"] match_mode = spec.get("match_mode", "in") - def _fetch_batch(batch_ids): + def _sanitize_value(v): + """Replace NaN float values with None for JSON-safe serialization.""" + if isinstance(v, float) and math.isnan(v): + return None + return v + + def _fetch_and_group_batch(batch_ids): + """Fetch a batch using fetchmany iterator and group into ``grouped``.""" builder = QueryBuilder() if match_mode == "contains": builder.add_or_like_conditions(filter_column, batch_ids, position="both") else: builder.add_in_condition(filter_column, batch_ids) sql = EventFetcher._build_domain_sql(domain, builder.get_conditions_sql()) - return batch_ids, read_sql_df(sql, builder.params, timeout_seconds=60) - def _sanitize_record(d): - """Replace NaN/NaT values with None for JSON-safe serialization.""" - for k, v in d.items(): - if isinstance(v, float) and math.isnan(v): - d[k] = None - return d - - def _process_batch_result(batch_ids, df): - if df is None or df.empty: - return - for _, row in df.iterrows(): - if domain == "jobs": - record = _sanitize_record(row.to_dict()) - containers = record.get("CONTAINERIDS") - if not isinstance(containers, str) or not containers: + for columns, rows in read_sql_df_slow_iter(sql, builder.params, timeout_seconds=60): + for row in rows: + record = {k: _sanitize_value(v) for k, v in zip(columns, row)} + if domain == "jobs": + containers = record.get("CONTAINERIDS") + if not isinstance(containers, str) or not containers: + continue + for cid in batch_ids: + if cid in containers: + enriched = dict(record) + enriched["CONTAINERID"] = cid + grouped[cid].append(enriched) continue - for cid in batch_ids: - if cid in containers: - enriched = dict(record) - enriched["CONTAINERID"] = cid - grouped[cid].append(enriched) - continue - cid = row.get("CONTAINERID") - if not isinstance(cid, str) or not cid: - continue - grouped[cid].append(_sanitize_record(row.to_dict())) + cid = record.get("CONTAINERID") + if not isinstance(cid, str) or not cid: + continue + grouped[cid].append(record) batches = [ normalized_ids[i:i + ORACLE_IN_BATCH_SIZE] @@ -281,15 +278,13 @@ class EventFetcher: if len(batches) <= 1: for batch in batches: - batch_ids, df = _fetch_batch(batch) - _process_batch_result(batch_ids, df) + _fetch_and_group_batch(batch) else: with ThreadPoolExecutor(max_workers=min(len(batches), EVENT_FETCHER_MAX_WORKERS)) as executor: - futures = {executor.submit(_fetch_batch, b): b for b in batches} + futures = {executor.submit(_fetch_and_group_batch, b): b for b in batches} for future in as_completed(futures): try: - batch_ids, df = future.result() - _process_batch_result(batch_ids, df) + future.result() except Exception: logger.error( "EventFetcher batch query failed domain=%s batch_size=%s", diff --git a/src/mes_dashboard/services/trace_job_service.py b/src/mes_dashboard/services/trace_job_service.py new file mode 100644 index 0000000..e8ead00 --- /dev/null +++ b/src/mes_dashboard/services/trace_job_service.py @@ -0,0 +1,616 @@ +# -*- coding: utf-8 -*- +"""Async trace job service using RQ (Redis Queue). + +Provides enqueue/status/result functions for long-running trace events queries. +The worker entry point ``execute_trace_events_job`` runs in a separate RQ worker +process — independent of gunicorn — with its own memory space. +""" + +from __future__ import annotations + +import gc +import json +import logging +import os +import time +import uuid +from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import Any, Dict, List, Optional, Tuple + +from mes_dashboard.core.redis_client import get_key, get_redis_client + +logger = logging.getLogger("mes_dashboard.trace_job_service") + +# --------------------------------------------------------------------------- +# Configuration from environment +# --------------------------------------------------------------------------- +TRACE_ASYNC_CID_THRESHOLD = int(os.getenv("TRACE_ASYNC_CID_THRESHOLD", "20000")) +TRACE_JOB_TTL_SECONDS = int(os.getenv("TRACE_JOB_TTL_SECONDS", "3600")) +TRACE_JOB_TIMEOUT_SECONDS = int(os.getenv("TRACE_JOB_TIMEOUT_SECONDS", "1800")) +TRACE_WORKER_QUEUE = os.getenv("TRACE_WORKER_QUEUE", "trace-events") +TRACE_EVENTS_MAX_WORKERS = int(os.getenv("TRACE_EVENTS_MAX_WORKERS", "2")) +TRACE_STREAM_BATCH_SIZE = int(os.getenv("TRACE_STREAM_BATCH_SIZE", "5000")) + +# --------------------------------------------------------------------------- +# RQ queue accessor +# --------------------------------------------------------------------------- +_RQ_AVAILABLE: Optional[bool] = None + + +def _check_rq_available() -> bool: + global _RQ_AVAILABLE + if _RQ_AVAILABLE is None: + try: + import rq # noqa: F401 + _RQ_AVAILABLE = True + except ImportError: + _RQ_AVAILABLE = False + return _RQ_AVAILABLE + + +def is_async_available() -> bool: + """Return True if RQ is installed and Redis is reachable.""" + if not _check_rq_available(): + return False + conn = get_redis_client() + return conn is not None + + +def _get_rq_queue(): + """Get RQ queue instance. Returns None if unavailable.""" + if not _check_rq_available(): + return None + conn = get_redis_client() + if conn is None: + return None + from rq import Queue + return Queue(TRACE_WORKER_QUEUE, connection=conn) + + +# --------------------------------------------------------------------------- +# Redis key helpers +# --------------------------------------------------------------------------- +def _meta_key(job_id: str) -> str: + return get_key(f"trace:job:{job_id}:meta") + + +def _result_key(job_id: str) -> str: + return get_key(f"trace:job:{job_id}:result") + + +def _result_meta_key(job_id: str) -> str: + return get_key(f"trace:job:{job_id}:result:meta") + + +def _result_chunk_key(job_id: str, domain: str, chunk_idx: int) -> str: + return get_key(f"trace:job:{job_id}:result:{domain}:{chunk_idx}") + + +def _result_aggregation_key(job_id: str) -> str: + return get_key(f"trace:job:{job_id}:result:aggregation") + + +# --------------------------------------------------------------------------- +# Public API: enqueue / status / result +# --------------------------------------------------------------------------- +def enqueue_trace_events_job( + profile: str, + container_ids: List[str], + domains: List[str], + payload: Dict[str, Any], +) -> Tuple[Optional[str], Optional[str]]: + """Enqueue an async trace events job. + + Returns: + (job_id, None) on success, (None, error_message) on failure. + """ + queue = _get_rq_queue() + if queue is None: + return None, "async job queue unavailable (Redis or RQ not installed)" + + job_id = f"trace-evt-{uuid.uuid4().hex[:12]}" + + conn = get_redis_client() + meta = { + "profile": profile, + "cid_count": str(len(container_ids)), + "domains": ",".join(domains), + "status": "queued", + "progress": "", + "created_at": str(time.time()), + "completed_at": "", + "error": "", + } + conn.hset(_meta_key(job_id), mapping=meta) + conn.expire(_meta_key(job_id), TRACE_JOB_TTL_SECONDS) + + try: + queue.enqueue( + execute_trace_events_job, + job_id, + profile, + container_ids, + domains, + payload, + job_id=job_id, + job_timeout=TRACE_JOB_TIMEOUT_SECONDS, + result_ttl=TRACE_JOB_TTL_SECONDS, + failure_ttl=TRACE_JOB_TTL_SECONDS, + ) + except Exception as exc: + logger.error("Failed to enqueue trace job: %s", exc, exc_info=True) + conn.delete(_meta_key(job_id)) + return None, f"enqueue failed: {exc}" + + logger.info( + "trace job enqueued job_id=%s profile=%s cid_count=%s domains=%s", + job_id, profile, len(container_ids), ",".join(domains), + ) + return job_id, None + + +def get_job_status(job_id: str) -> Optional[Dict[str, Any]]: + """Get trace job status from Redis metadata. Returns None if not found.""" + conn = get_redis_client() + if conn is None: + return None + + meta = conn.hgetall(_meta_key(job_id)) + if not meta: + return None + + created_at = float(meta.get("created_at", 0)) + elapsed = time.time() - created_at if created_at > 0 else 0 + + return { + "job_id": job_id, + "status": meta.get("status", "unknown"), + "profile": meta.get("profile"), + "cid_count": int(meta.get("cid_count", 0)), + "domains": meta.get("domains", "").split(",") if meta.get("domains") else [], + "progress": meta.get("progress", ""), + "created_at": created_at, + "elapsed_seconds": round(elapsed, 1), + "error": meta.get("error") or None, + } + + +def get_job_result( + job_id: str, + domain: Optional[str] = None, + offset: int = 0, + limit: int = 0, +) -> Optional[Dict[str, Any]]: + """Get completed job result from Redis. + + Supports chunked storage (new) and legacy single-key storage. + Supports optional domain filtering and pagination via offset/limit. + """ + conn = get_redis_client() + if conn is None: + return None + + # Try chunked storage first + raw_meta = conn.get(_result_meta_key(job_id)) + if raw_meta is not None: + return _get_chunked_result(conn, job_id, raw_meta, domain, offset, limit) + + # Fall back to legacy single-key storage + raw = conn.get(_result_key(job_id)) + if raw is None: + return None + + result = json.loads(raw) + + if domain and "results" in result: + domain_data = result["results"].get(domain) + if domain_data is None: + return result + rows = domain_data.get("data", []) + if offset > 0: + rows = rows[offset:] + if limit > 0: + rows = rows[:limit] + result["results"] = { + domain: {"data": rows, "count": len(rows), "total": domain_data.get("count", 0)}, + } + + return result + + +def _get_chunked_result( + conn, + job_id: str, + raw_meta: str, + domain: Optional[str] = None, + offset: int = 0, + limit: int = 0, +) -> Dict[str, Any]: + """Reconstruct result from chunked Redis keys.""" + meta = json.loads(raw_meta) + domain_info = meta.get("domains", {}) + + results: Dict[str, Any] = {} + target_domains = [domain] if domain else list(domain_info.keys()) + + for d in target_domains: + info = domain_info.get(d) + if info is None: + continue + num_chunks = info.get("chunks", 0) + total = info.get("total", 0) + + rows: List[Dict[str, Any]] = [] + for i in range(num_chunks): + raw_chunk = conn.get(_result_chunk_key(job_id, d, i)) + if raw_chunk is not None: + rows.extend(json.loads(raw_chunk)) + + if offset > 0: + rows = rows[offset:] + if limit > 0: + rows = rows[:limit] + + results[d] = {"data": rows, "count": len(rows), "total": total} + + aggregation = None + raw_agg = conn.get(_result_aggregation_key(job_id)) + if raw_agg is not None: + aggregation = json.loads(raw_agg) + + result: Dict[str, Any] = { + "stage": "events", + "results": results, + "aggregation": aggregation, + } + + if meta.get("failed_domains"): + result["error"] = "one or more domains failed" + result["code"] = "EVENTS_PARTIAL_FAILURE" + result["failed_domains"] = meta["failed_domains"] + + return result + + +# --------------------------------------------------------------------------- +# NDJSON streaming generator +# --------------------------------------------------------------------------- +def stream_job_result_ndjson(job_id: str): + """Generator yielding NDJSON lines for a completed job's chunked result. + + Each line is a JSON object followed by ``\\n``. The protocol: + + 1. ``{"type":"meta", ...}`` + 2. For each domain: + - ``{"type":"domain_start", "domain":"...", "total":N}`` + - ``{"type":"records", "domain":"...", "batch":i, "count":N, "data":[...]}`` + - ``{"type":"domain_end", "domain":"...", "count":N}`` + 3. ``{"type":"aggregation", "data":{...}}`` (if present) + 4. ``{"type":"complete", "total_records":N}`` + """ + conn = get_redis_client() + if conn is None: + yield _ndjson_line({"type": "error", "message": "Redis unavailable"}) + return + + raw_meta = conn.get(_result_meta_key(job_id)) + if raw_meta is None: + # Fall back: legacy single-key result → emit as single NDJSON blob + raw = conn.get(_result_key(job_id)) + if raw is None: + yield _ndjson_line({"type": "error", "message": "result not found"}) + return + result = json.loads(raw) + yield _ndjson_line({"type": "full_result", "data": result}) + return + + meta = json.loads(raw_meta) + domain_info = meta.get("domains", {}) + + yield _ndjson_line({ + "type": "meta", + "job_id": job_id, + "profile": meta.get("profile"), + "domains": list(domain_info.keys()), + }) + + total_records = 0 + for domain_name, info in domain_info.items(): + num_chunks = info.get("chunks", 0) + domain_total = info.get("total", 0) + + yield _ndjson_line({ + "type": "domain_start", + "domain": domain_name, + "total": domain_total, + }) + + domain_count = 0 + for i in range(num_chunks): + raw_chunk = conn.get(_result_chunk_key(job_id, domain_name, i)) + if raw_chunk is None: + continue + rows = json.loads(raw_chunk) + domain_count += len(rows) + yield _ndjson_line({ + "type": "records", + "domain": domain_name, + "batch": i, + "count": len(rows), + "data": rows, + }) + + yield _ndjson_line({ + "type": "domain_end", + "domain": domain_name, + "count": domain_count, + }) + total_records += domain_count + + raw_agg = conn.get(_result_aggregation_key(job_id)) + if raw_agg is not None: + yield _ndjson_line({ + "type": "aggregation", + "data": json.loads(raw_agg), + }) + + if meta.get("failed_domains"): + yield _ndjson_line({ + "type": "warning", + "code": "EVENTS_PARTIAL_FAILURE", + "failed_domains": meta["failed_domains"], + }) + + yield _ndjson_line({ + "type": "complete", + "total_records": total_records, + }) + + +def _ndjson_line(obj: Dict[str, Any]) -> str: + return json.dumps(obj, default=str, ensure_ascii=False) + "\n" + + +# --------------------------------------------------------------------------- +# Chunked result storage +# --------------------------------------------------------------------------- +def _store_chunked_result( + conn, + job_id: str, + profile: str, + results: Dict[str, Dict[str, Any]], + aggregation: Optional[Dict[str, Any]], + failed_domains: List[str], +) -> None: + """Store job result as chunked Redis keys for streaming retrieval.""" + domain_info: Dict[str, Dict[str, Any]] = {} + + for domain_name, domain_data in results.items(): + rows = domain_data.get("data", []) + total = len(rows) + chunks = [ + rows[i:i + TRACE_STREAM_BATCH_SIZE] + for i in range(0, max(len(rows), 1), TRACE_STREAM_BATCH_SIZE) + ] if rows else [] + + for idx, chunk in enumerate(chunks): + conn.setex( + _result_chunk_key(job_id, domain_name, idx), + TRACE_JOB_TTL_SECONDS, + json.dumps(chunk, default=str, ensure_ascii=False), + ) + + domain_info[domain_name] = {"chunks": len(chunks), "total": total} + + if aggregation is not None: + conn.setex( + _result_aggregation_key(job_id), + TRACE_JOB_TTL_SECONDS, + json.dumps(aggregation, default=str, ensure_ascii=False), + ) + + result_meta = { + "profile": profile, + "domains": domain_info, + "failed_domains": sorted(failed_domains) if failed_domains else [], + } + conn.setex( + _result_meta_key(job_id), + TRACE_JOB_TTL_SECONDS, + json.dumps(result_meta, default=str, ensure_ascii=False), + ) + + +# --------------------------------------------------------------------------- +# Worker entry point (runs in RQ worker process) +# --------------------------------------------------------------------------- +def _update_meta(job_id: str, **fields) -> None: + """Update job metadata fields in Redis.""" + conn = get_redis_client() + if conn is None: + return + key = _meta_key(job_id) + conn.hset(key, mapping={k: str(v) for k, v in fields.items()}) + + +def execute_trace_events_job( + job_id: str, + profile: str, + container_ids: List[str], + domains: List[str], + payload: Dict[str, Any], +) -> None: + """RQ worker entry point: execute trace events and store result in Redis. + + This function runs in a dedicated RQ worker process — outside gunicorn — + so it does not compete for gunicorn worker threads or the DB connection pool. + """ + from mes_dashboard.services.event_fetcher import EventFetcher + + logger.info( + "trace job started job_id=%s profile=%s cid_count=%s domains=%s", + job_id, profile, len(container_ids), ",".join(domains), + ) + + _update_meta(job_id, status="started", progress="fetching events") + + try: + results: Dict[str, Dict[str, Any]] = {} + raw_domain_results: Dict[str, Dict[str, List[Dict[str, Any]]]] = {} + failed_domains: List[str] = [] + + is_msd = (profile == "mid_section_defect") + + with ThreadPoolExecutor( + max_workers=min(len(domains), TRACE_EVENTS_MAX_WORKERS), + ) as executor: + futures = { + executor.submit(EventFetcher.fetch_events, container_ids, domain): domain + for domain in domains + } + for future in as_completed(futures): + domain = futures[future] + try: + events_by_cid = future.result() + rows = _flatten_domain_records(events_by_cid) + results[domain] = {"data": rows, "count": len(rows)} + if is_msd: + raw_domain_results[domain] = events_by_cid + except Exception as exc: + logger.error( + "trace job domain failed job_id=%s domain=%s: %s", + job_id, domain, exc, exc_info=True, + ) + failed_domains.append(domain) + + _update_meta(job_id, progress="building response") + + aggregation = None + if is_msd: + aggregation, agg_error = _build_job_msd_aggregation(payload, raw_domain_results) + del raw_domain_results + if agg_error is not None: + raise RuntimeError(agg_error) + else: + del raw_domain_results + + # Store result in Redis as chunked keys for streaming retrieval + conn = get_redis_client() + if conn is not None: + _store_chunked_result( + conn, job_id, profile, results, aggregation, failed_domains, + ) + + _update_meta( + job_id, + status="finished", + progress="complete", + completed_at=time.time(), + ) + + logger.info( + "trace job completed job_id=%s profile=%s domains=%s", + job_id, profile, ",".join(domains), + ) + + if len(container_ids) > 10000: + gc.collect() + + except Exception as exc: + logger.error( + "trace job failed job_id=%s: %s", job_id, exc, exc_info=True, + ) + _update_meta( + job_id, + status="failed", + error=str(exc), + completed_at=time.time(), + ) + raise + + +# --------------------------------------------------------------------------- +# Helpers (duplicated from trace_routes to avoid Flask dependency in worker) +# --------------------------------------------------------------------------- +def _flatten_domain_records( + events_by_cid: Dict[str, List[Dict[str, Any]]], +) -> List[Dict[str, Any]]: + rows: List[Dict[str, Any]] = [] + for records in events_by_cid.values(): + if not isinstance(records, list): + continue + for row in records: + if isinstance(row, dict): + rows.append(row) + return rows + + +def _build_job_msd_aggregation( + payload: Dict[str, Any], + domain_results: Dict[str, Dict[str, List[Dict[str, Any]]]], +) -> Tuple[Optional[Dict[str, Any]], Optional[str]]: + """Build MSD aggregation inside worker process (no Flask context).""" + from mes_dashboard.services.mid_section_defect_service import ( + build_trace_aggregation_from_events, + parse_loss_reasons_param, + ) + + params = payload.get("params") + if not isinstance(params, dict): + return None, "params is required for mid_section_defect profile" + + mode = str(params.get("mode") or "date_range").strip() + + start_date = None + end_date = None + if mode != "container": + date_range = params.get("date_range") + if isinstance(date_range, list) and len(date_range) == 2: + start_date = str(date_range[0] or "").strip() + end_date = str(date_range[1] or "").strip() + if not start_date or not end_date: + start_date = str(params.get("start_date") or "").strip() + end_date = str(params.get("end_date") or "").strip() + if not start_date or not end_date: + return None, "start_date/end_date is required in params" + + raw_loss_reasons = params.get("loss_reasons") + loss_reasons = parse_loss_reasons_param(raw_loss_reasons) + + lineage = payload.get("lineage") or {} + lineage_ancestors = lineage.get("ancestors") if isinstance(lineage, dict) else None + lineage_roots = lineage.get("seed_roots") if isinstance(lineage, dict) else None + + seed_container_ids = payload.get("seed_container_ids", []) + if not seed_container_ids and isinstance(lineage_ancestors, dict): + seed_container_ids = list(lineage_ancestors.keys()) + seed_container_ids = [ + s.strip() for s in seed_container_ids + if isinstance(s, str) and s.strip() + ] + + upstream_events = domain_results.get("upstream_history", {}) + materials_events = domain_results.get("materials", {}) + downstream_events = domain_results.get("downstream_rejects", {}) + station = str(params.get("station") or "測試").strip() + direction = str(params.get("direction") or "backward").strip() + + aggregation = build_trace_aggregation_from_events( + start_date, + end_date, + loss_reasons=loss_reasons, + seed_container_ids=seed_container_ids, + lineage_ancestors=lineage_ancestors, + lineage_roots=lineage_roots, + upstream_events_by_cid=upstream_events, + materials_events_by_cid=materials_events, + downstream_events_by_cid=downstream_events, + station=station, + direction=direction, + mode=mode, + ) + if aggregation is None: + return None, "aggregation service unavailable" + if "error" in aggregation: + return None, str(aggregation["error"]) + return aggregation, None diff --git a/tests/test_database_slow_iter.py b/tests/test_database_slow_iter.py new file mode 100644 index 0000000..b0c346c --- /dev/null +++ b/tests/test_database_slow_iter.py @@ -0,0 +1,119 @@ +# -*- coding: utf-8 -*- +"""Unit tests for read_sql_df_slow_iter (fetchmany iterator).""" + +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +import mes_dashboard.core.database as db + + +@patch.object(db, "oracledb") +@patch.object(db, "_get_slow_query_semaphore") +@patch.object(db, "get_db_runtime_config") +def test_slow_iter_yields_batches(mock_runtime, mock_sem_fn, mock_oracledb): + """read_sql_df_slow_iter should yield (columns, rows) batches via fetchmany.""" + mock_runtime.return_value = { + "slow_call_timeout_ms": 60000, + "slow_fetchmany_size": 2, + "tcp_connect_timeout": 10, + "retry_count": 1, + "retry_delay": 1.0, + } + + sem = MagicMock() + sem.acquire.return_value = True + mock_sem_fn.return_value = sem + + cursor = MagicMock() + cursor.description = [("COL_A",), ("COL_B",)] + cursor.fetchmany.side_effect = [ + [("r1a", "r1b"), ("r2a", "r2b")], + [("r3a", "r3b")], + [], + ] + + conn = MagicMock() + conn.cursor.return_value = cursor + mock_oracledb.connect.return_value = conn + + batches = list(db.read_sql_df_slow_iter("SELECT 1", {"p0": "x"}, batch_size=2)) + + assert len(batches) == 2 + assert batches[0] == (["COL_A", "COL_B"], [("r1a", "r1b"), ("r2a", "r2b")]) + assert batches[1] == (["COL_A", "COL_B"], [("r3a", "r3b")]) + cursor.fetchmany.assert_called_with(2) + conn.close.assert_called_once() + sem.release.assert_called_once() + + +@patch.object(db, "oracledb") +@patch.object(db, "_get_slow_query_semaphore") +@patch.object(db, "get_db_runtime_config") +def test_slow_iter_empty_result(mock_runtime, mock_sem_fn, mock_oracledb): + """read_sql_df_slow_iter should yield nothing for empty result.""" + mock_runtime.return_value = { + "slow_call_timeout_ms": 60000, + "slow_fetchmany_size": 5000, + "tcp_connect_timeout": 10, + "retry_count": 1, + "retry_delay": 1.0, + } + + sem = MagicMock() + sem.acquire.return_value = True + mock_sem_fn.return_value = sem + + cursor = MagicMock() + cursor.description = [("ID",)] + cursor.fetchmany.return_value = [] + + conn = MagicMock() + conn.cursor.return_value = cursor + mock_oracledb.connect.return_value = conn + + batches = list(db.read_sql_df_slow_iter("SELECT 1")) + + assert batches == [] + conn.close.assert_called_once() + sem.release.assert_called_once() + + +@patch.object(db, "oracledb") +@patch.object(db, "_get_slow_query_semaphore") +@patch.object(db, "get_db_runtime_config") +def test_slow_iter_releases_on_error(mock_runtime, mock_sem_fn, mock_oracledb): + """Semaphore and connection should be released even on error.""" + mock_runtime.return_value = { + "slow_call_timeout_ms": 60000, + "slow_fetchmany_size": 5000, + "tcp_connect_timeout": 10, + "retry_count": 1, + "retry_delay": 1.0, + } + + sem = MagicMock() + sem.acquire.return_value = True + mock_sem_fn.return_value = sem + + conn = MagicMock() + conn.cursor.side_effect = RuntimeError("cursor failed") + mock_oracledb.connect.return_value = conn + + try: + list(db.read_sql_df_slow_iter("SELECT 1")) + except RuntimeError: + pass + + conn.close.assert_called_once() + sem.release.assert_called_once() + + +def test_runtime_config_includes_fetchmany_size(): + """get_db_runtime_config should include slow_fetchmany_size.""" + # Force refresh to pick up current config + db._DB_RUNTIME_CONFIG = None + runtime = db.get_db_runtime_config(refresh=True) + assert "slow_fetchmany_size" in runtime + assert isinstance(runtime["slow_fetchmany_size"], int) + assert runtime["slow_fetchmany_size"] > 0 diff --git a/tests/test_event_fetcher.py b/tests/test_event_fetcher.py index 6cefc77..bd9082e 100644 --- a/tests/test_event_fetcher.py +++ b/tests/test_event_fetcher.py @@ -5,11 +5,21 @@ from __future__ import annotations from unittest.mock import patch -import pandas as pd - from mes_dashboard.services.event_fetcher import EventFetcher +def _iter_result(columns, rows): + """Helper: create a generator that yields a single (columns, rows) batch.""" + def _gen(*args, **kwargs): + yield columns, rows + return _gen + + +def _iter_empty(*args, **kwargs): + """Helper: generator that yields nothing (empty result).""" + return iter([]) + + def test_cache_key_is_stable_for_sorted_ids(): key1 = EventFetcher._cache_key("history", ["CID-B", "CID-A", "CID-A"]) key2 = EventFetcher._cache_key("history", ["CID-A", "CID-B"]) @@ -29,98 +39,96 @@ def test_get_rate_limit_config_supports_env_override(monkeypatch): assert config["window_seconds"] == 77 -@patch("mes_dashboard.services.event_fetcher.read_sql_df") +@patch("mes_dashboard.services.event_fetcher.read_sql_df_slow_iter") @patch("mes_dashboard.services.event_fetcher.cache_get") -def test_fetch_events_cache_hit_skips_db(mock_cache_get, mock_read_sql_df): +def test_fetch_events_cache_hit_skips_db(mock_cache_get, mock_iter): mock_cache_get.return_value = {"CID-1": [{"CONTAINERID": "CID-1"}]} result = EventFetcher.fetch_events(["CID-1"], "materials") assert result["CID-1"][0]["CONTAINERID"] == "CID-1" - mock_read_sql_df.assert_not_called() + mock_iter.assert_not_called() @patch("mes_dashboard.services.event_fetcher.cache_set") @patch("mes_dashboard.services.event_fetcher.cache_get", return_value=None) -@patch("mes_dashboard.services.event_fetcher.read_sql_df") +@patch("mes_dashboard.services.event_fetcher.read_sql_df_slow_iter") @patch("mes_dashboard.services.event_fetcher.SQLLoader.load_with_params") def test_fetch_events_upstream_history_branch( mock_sql_load, - mock_read_sql_df, + mock_iter, _mock_cache_get, mock_cache_set, ): mock_sql_load.return_value = "SELECT * FROM UPSTREAM" - mock_read_sql_df.return_value = pd.DataFrame( - [ - {"CONTAINERID": "CID-1", "WORKCENTER_GROUP": "DB"}, - {"CONTAINERID": "CID-2", "WORKCENTER_GROUP": "WB"}, - ] + mock_iter.side_effect = _iter_result( + ["CONTAINERID", "WORKCENTER_GROUP"], + [("CID-1", "DB"), ("CID-2", "WB")], ) result = EventFetcher.fetch_events(["CID-1", "CID-2"], "upstream_history") assert sorted(result.keys()) == ["CID-1", "CID-2"] assert mock_sql_load.call_args.args[0] == "mid_section_defect/upstream_history" - _, params = mock_read_sql_df.call_args.args - assert len(params) == 2 + sql_arg, params_arg = mock_iter.call_args.args + assert len(params_arg) == 2 mock_cache_set.assert_called_once() assert mock_cache_set.call_args.args[0].startswith("evt:upstream_history:") @patch("mes_dashboard.services.event_fetcher.cache_set") @patch("mes_dashboard.services.event_fetcher.cache_get", return_value=None) -@patch("mes_dashboard.services.event_fetcher.read_sql_df") +@patch("mes_dashboard.services.event_fetcher.read_sql_df_slow_iter") @patch("mes_dashboard.services.event_fetcher.SQLLoader.load") def test_fetch_events_history_branch_replaces_container_filter( mock_sql_load, - mock_read_sql_df, + mock_iter, _mock_cache_get, _mock_cache_set, ): mock_sql_load.return_value = ( "SELECT * FROM t WHERE h.CONTAINERID = :container_id {{ WORKCENTER_FILTER }}" ) - mock_read_sql_df.return_value = pd.DataFrame([]) + mock_iter.side_effect = _iter_empty EventFetcher.fetch_events(["CID-1"], "history") - sql, params = mock_read_sql_df.call_args.args - assert "h.CONTAINERID = :container_id" not in sql - assert "{{ WORKCENTER_FILTER }}" not in sql - assert params == {"p0": "CID-1"} + sql_arg, params_arg = mock_iter.call_args.args + assert "h.CONTAINERID = :container_id" not in sql_arg + assert "{{ WORKCENTER_FILTER }}" not in sql_arg + assert params_arg == {"p0": "CID-1"} @patch("mes_dashboard.services.event_fetcher.cache_set") @patch("mes_dashboard.services.event_fetcher.cache_get", return_value=None) -@patch("mes_dashboard.services.event_fetcher.read_sql_df") +@patch("mes_dashboard.services.event_fetcher.read_sql_df_slow_iter") @patch("mes_dashboard.services.event_fetcher.SQLLoader.load") def test_fetch_events_materials_branch_replaces_aliased_container_filter( mock_sql_load, - mock_read_sql_df, + mock_iter, _mock_cache_get, _mock_cache_set, ): mock_sql_load.return_value = ( "SELECT * FROM t m WHERE m.CONTAINERID = :container_id ORDER BY TXNDATE" ) - mock_read_sql_df.return_value = pd.DataFrame([]) + mock_iter.side_effect = _iter_empty EventFetcher.fetch_events(["CID-1", "CID-2"], "materials") - sql, params = mock_read_sql_df.call_args.args - assert "m.CONTAINERID = :container_id" not in sql - assert "IN" in sql.upper() - assert params == {"p0": "CID-1", "p1": "CID-2"} + sql_arg, params_arg = mock_iter.call_args.args + assert "m.CONTAINERID = :container_id" not in sql_arg + assert "IN" in sql_arg.upper() + assert params_arg == {"p0": "CID-1", "p1": "CID-2"} @patch("mes_dashboard.services.event_fetcher.cache_set") @patch("mes_dashboard.services.event_fetcher.cache_get", return_value=None) -@patch("mes_dashboard.services.event_fetcher.read_sql_df") +@patch("mes_dashboard.services.event_fetcher.read_sql_df_slow_iter") @patch("mes_dashboard.services.event_fetcher.SQLLoader.load") def test_fetch_events_rejects_branch_replaces_aliased_container_filter( mock_sql_load, - mock_read_sql_df, + mock_iter, _mock_cache_get, _mock_cache_set, ): @@ -128,23 +136,23 @@ def test_fetch_events_rejects_branch_replaces_aliased_container_filter( "SELECT * FROM t r LEFT JOIN c ON c.CONTAINERID = r.CONTAINERID " "WHERE r.CONTAINERID = :container_id ORDER BY r.TXNDATE" ) - mock_read_sql_df.return_value = pd.DataFrame([]) + mock_iter.side_effect = _iter_empty EventFetcher.fetch_events(["CID-1", "CID-2"], "rejects") - sql, params = mock_read_sql_df.call_args.args - assert "r.CONTAINERID = :container_id" not in sql - assert "IN" in sql.upper() - assert params == {"p0": "CID-1", "p1": "CID-2"} + sql_arg, params_arg = mock_iter.call_args.args + assert "r.CONTAINERID = :container_id" not in sql_arg + assert "IN" in sql_arg.upper() + assert params_arg == {"p0": "CID-1", "p1": "CID-2"} @patch("mes_dashboard.services.event_fetcher.cache_set") @patch("mes_dashboard.services.event_fetcher.cache_get", return_value=None) -@patch("mes_dashboard.services.event_fetcher.read_sql_df") +@patch("mes_dashboard.services.event_fetcher.read_sql_df_slow_iter") @patch("mes_dashboard.services.event_fetcher.SQLLoader.load") def test_fetch_events_holds_branch_replaces_aliased_container_filter( mock_sql_load, - mock_read_sql_df, + mock_iter, _mock_cache_get, _mock_cache_set, ): @@ -152,19 +160,41 @@ def test_fetch_events_holds_branch_replaces_aliased_container_filter( "SELECT * FROM t h LEFT JOIN c ON c.CONTAINERID = h.CONTAINERID " "WHERE h.CONTAINERID = :container_id ORDER BY h.HOLDTXNDATE DESC" ) - mock_read_sql_df.return_value = pd.DataFrame([]) + mock_iter.side_effect = _iter_empty EventFetcher.fetch_events(["CID-1", "CID-2"], "holds") - sql, params = mock_read_sql_df.call_args.args - assert "h.CONTAINERID = :container_id" not in sql - assert "IN" in sql.upper() - assert params == {"p0": "CID-1", "p1": "CID-2"} + sql_arg, params_arg = mock_iter.call_args.args + assert "h.CONTAINERID = :container_id" not in sql_arg + assert "IN" in sql_arg.upper() + assert params_arg == {"p0": "CID-1", "p1": "CID-2"} -def test_event_fetcher_uses_slow_connection(): - """Regression: event_fetcher must use read_sql_df_slow (non-pooled).""" +def test_event_fetcher_uses_slow_iter_connection(): + """Regression: event_fetcher must use read_sql_df_slow_iter (non-pooled).""" import mes_dashboard.services.event_fetcher as ef - from mes_dashboard.core.database import read_sql_df_slow + from mes_dashboard.core.database import read_sql_df_slow_iter - assert ef.read_sql_df is read_sql_df_slow + assert ef.read_sql_df_slow_iter is read_sql_df_slow_iter + + +@patch("mes_dashboard.services.event_fetcher.cache_set") +@patch("mes_dashboard.services.event_fetcher.cache_get", return_value=None) +@patch("mes_dashboard.services.event_fetcher.read_sql_df_slow_iter") +@patch("mes_dashboard.services.event_fetcher.SQLLoader.load_with_params") +def test_fetch_events_sanitizes_nan_values( + mock_sql_load, + mock_iter, + _mock_cache_get, + _mock_cache_set, +): + """NaN float values in records should be replaced with None.""" + mock_sql_load.return_value = "SELECT * FROM UPSTREAM" + mock_iter.side_effect = _iter_result( + ["CONTAINERID", "VALUE"], + [("CID-1", float("nan"))], + ) + + result = EventFetcher.fetch_events(["CID-1"], "upstream_history") + + assert result["CID-1"][0]["VALUE"] is None diff --git a/tests/test_trace_job_service.py b/tests/test_trace_job_service.py new file mode 100644 index 0000000..9dd1c7a --- /dev/null +++ b/tests/test_trace_job_service.py @@ -0,0 +1,507 @@ +# -*- coding: utf-8 -*- +"""Unit tests for trace_job_service (async trace job queue).""" + +from __future__ import annotations + +import json +from unittest.mock import MagicMock, patch + +import pytest + +import mes_dashboard.services.trace_job_service as tjs + + +# --------------------------------------------------------------------------- +# is_async_available +# --------------------------------------------------------------------------- +def test_is_async_available_true(): + """Should return True when rq is importable and Redis is up.""" + tjs._RQ_AVAILABLE = None # reset cached flag + with patch.object(tjs, "get_redis_client", return_value=MagicMock()): + assert tjs.is_async_available() is True + + +def test_is_async_available_false_no_redis(): + """Should return False when Redis is unavailable.""" + tjs._RQ_AVAILABLE = True + with patch.object(tjs, "get_redis_client", return_value=None): + assert tjs.is_async_available() is False + + +# --------------------------------------------------------------------------- +# enqueue_trace_events_job +# --------------------------------------------------------------------------- +@patch.object(tjs, "_get_rq_queue") +@patch.object(tjs, "get_redis_client") +def test_enqueue_success(mock_redis, mock_queue_fn): + """Enqueue should return a job_id and store metadata in Redis.""" + conn = MagicMock() + mock_redis.return_value = conn + + queue = MagicMock() + mock_queue_fn.return_value = queue + + job_id, err = tjs.enqueue_trace_events_job( + "query_tool", ["CID-1", "CID-2"], ["history"], {"params": {}}, + ) + + assert job_id is not None + assert job_id.startswith("trace-evt-") + assert err is None + queue.enqueue.assert_called_once() + conn.hset.assert_called_once() + conn.expire.assert_called_once() + + +@patch.object(tjs, "_get_rq_queue", return_value=None) +def test_enqueue_no_queue(mock_queue_fn): + """Enqueue should return error when queue is unavailable.""" + job_id, err = tjs.enqueue_trace_events_job( + "query_tool", ["CID-1"], ["history"], {}, + ) + + assert job_id is None + assert "unavailable" in err + + +@patch.object(tjs, "_get_rq_queue") +@patch.object(tjs, "get_redis_client") +def test_enqueue_queue_error(mock_redis, mock_queue_fn): + """Enqueue should return error when queue.enqueue raises.""" + conn = MagicMock() + mock_redis.return_value = conn + + queue = MagicMock() + queue.enqueue.side_effect = RuntimeError("connection refused") + mock_queue_fn.return_value = queue + + job_id, err = tjs.enqueue_trace_events_job( + "query_tool", ["CID-1"], ["history"], {}, + ) + + assert job_id is None + assert "connection refused" in err + # Meta key should be cleaned up + conn.delete.assert_called_once() + + +# --------------------------------------------------------------------------- +# get_job_status +# --------------------------------------------------------------------------- +@patch.object(tjs, "get_redis_client") +def test_get_job_status_found(mock_redis): + """Should return status dict from Redis hash.""" + conn = MagicMock() + conn.hgetall.return_value = { + "profile": "query_tool", + "cid_count": "100", + "domains": "history,materials", + "status": "started", + "progress": "fetching events", + "created_at": "1740000000.0", + "completed_at": "", + "error": "", + } + mock_redis.return_value = conn + + status = tjs.get_job_status("trace-evt-abc123") + + assert status is not None + assert status["job_id"] == "trace-evt-abc123" + assert status["status"] == "started" + assert status["cid_count"] == 100 + assert status["domains"] == ["history", "materials"] + assert status["error"] is None + + +@patch.object(tjs, "get_redis_client") +def test_get_job_status_not_found(mock_redis): + """Should return None when job metadata does not exist.""" + conn = MagicMock() + conn.hgetall.return_value = {} + mock_redis.return_value = conn + + assert tjs.get_job_status("trace-evt-nonexistent") is None + + +# --------------------------------------------------------------------------- +# get_job_result +# --------------------------------------------------------------------------- +@patch.object(tjs, "get_redis_client") +def test_get_job_result_found_chunked(mock_redis): + """Should return reconstructed result from chunked Redis keys.""" + result_meta = { + "profile": "query_tool", + "domains": {"history": {"chunks": 1, "total": 1}}, + "failed_domains": [], + } + chunk_data = [{"CONTAINERID": "CID-1"}] + + conn = MagicMock() + + def _get_side_effect(key): + if key.endswith(":result:meta"): + return json.dumps(result_meta) + if key.endswith(":result:history:0"): + return json.dumps(chunk_data) + if key.endswith(":result:aggregation"): + return None + return None + + conn.get.side_effect = _get_side_effect + mock_redis.return_value = conn + + result = tjs.get_job_result("trace-evt-abc123") + + assert result is not None + assert result["stage"] == "events" + assert result["results"]["history"]["count"] == 1 + assert result["results"]["history"]["total"] == 1 + + +@patch.object(tjs, "get_redis_client") +def test_get_job_result_found_legacy(mock_redis): + """Should fall back to legacy single-key result when no chunked meta exists.""" + result_data = { + "stage": "events", + "results": {"history": {"data": [{"CONTAINERID": "CID-1"}], "count": 1}}, + "aggregation": None, + } + conn = MagicMock() + + def _get_side_effect(key): + if key.endswith(":result:meta"): + return None # no chunked meta + if key.endswith(":result"): + return json.dumps(result_data) + return None + + conn.get.side_effect = _get_side_effect + mock_redis.return_value = conn + + result = tjs.get_job_result("trace-evt-abc123") + + assert result is not None + assert result["stage"] == "events" + assert result["results"]["history"]["count"] == 1 + + +@patch.object(tjs, "get_redis_client") +def test_get_job_result_not_found(mock_redis): + """Should return None when result key does not exist.""" + conn = MagicMock() + conn.get.return_value = None + mock_redis.return_value = conn + + assert tjs.get_job_result("trace-evt-expired") is None + + +@patch.object(tjs, "get_redis_client") +def test_get_job_result_with_domain_filter(mock_redis): + """Should return filtered result with pagination from chunked storage.""" + result_meta = { + "profile": "query_tool", + "domains": { + "history": {"chunks": 1, "total": 3}, + "materials": {"chunks": 1, "total": 1}, + }, + "failed_domains": [], + } + history_chunk = [{"id": 1}, {"id": 2}, {"id": 3}] + materials_chunk = [{"id": 10}] + + conn = MagicMock() + + def _get_side_effect(key): + if key.endswith(":result:meta"): + return json.dumps(result_meta) + if key.endswith(":result:history:0"): + return json.dumps(history_chunk) + if key.endswith(":result:materials:0"): + return json.dumps(materials_chunk) + if key.endswith(":result:aggregation"): + return None + return None + + conn.get.side_effect = _get_side_effect + mock_redis.return_value = conn + + result = tjs.get_job_result("trace-evt-abc", domain="history", offset=1, limit=1) + + assert "history" in result["results"] + assert "materials" not in result["results"] + assert result["results"]["history"]["data"] == [{"id": 2}] + assert result["results"]["history"]["total"] == 3 + + +# --------------------------------------------------------------------------- +# execute_trace_events_job (worker entry point) +# --------------------------------------------------------------------------- +@patch.object(tjs, "get_redis_client") +@patch("mes_dashboard.services.event_fetcher.EventFetcher.fetch_events") +def test_execute_job_success(mock_fetch, mock_redis): + """Worker should fetch events, store result, and update meta to finished.""" + mock_fetch.return_value = {"CID-1": [{"CONTAINERID": "CID-1"}]} + + conn = MagicMock() + mock_redis.return_value = conn + + tjs.execute_trace_events_job( + "test-job-1", "query_tool", ["CID-1"], ["history"], {}, + ) + + mock_fetch.assert_called_once_with(["CID-1"], "history") + + # Result should be stored as chunked keys (chunk + result meta) + setex_calls = [c for c in conn.method_calls if c[0] == "setex"] + assert len(setex_calls) == 2 # 1 chunk + 1 result meta + + # Find the result meta setex call + result_meta_call = [c for c in setex_calls if ":result:meta" in str(c)] + assert len(result_meta_call) == 1 + stored_meta = json.loads(result_meta_call[0][1][2]) + assert "history" in stored_meta["domains"] + assert stored_meta["domains"]["history"]["total"] == 1 + + # Find the chunk setex call + chunk_call = [c for c in setex_calls if ":result:history:0" in str(c)] + assert len(chunk_call) == 1 + stored_chunk = json.loads(chunk_call[0][1][2]) + assert len(stored_chunk) == 1 + assert stored_chunk[0]["CONTAINERID"] == "CID-1" + + # Job meta should be updated to finished + hset_calls = [c for c in conn.method_calls if c[0] == "hset"] + last_meta = hset_calls[-1][2]["mapping"] + assert last_meta["status"] == "finished" + + +@patch.object(tjs, "get_redis_client") +@patch("mes_dashboard.services.event_fetcher.EventFetcher.fetch_events") +def test_execute_job_domain_failure_records_partial(mock_fetch, mock_redis): + """Domain fetch failure should result in partial failure, not job crash.""" + mock_fetch.side_effect = RuntimeError("db timeout") + + conn = MagicMock() + mock_redis.return_value = conn + + tjs.execute_trace_events_job( + "test-job-2", "query_tool", ["CID-1"], ["history"], {}, + ) + + # Result meta should still be stored (with failed_domains) + setex_calls = [c for c in conn.method_calls if c[0] == "setex"] + assert len(setex_calls) == 1 # only result meta (no chunks since domain failed) + stored_meta = json.loads(setex_calls[0][1][2]) + assert "history" in stored_meta["failed_domains"] + + # Job meta should still be finished (partial failure is not a job crash) + hset_calls = [c for c in conn.method_calls if c[0] == "hset"] + last_meta = hset_calls[-1][2]["mapping"] + assert last_meta["status"] == "finished" + + +# --------------------------------------------------------------------------- +# _store_chunked_result +# --------------------------------------------------------------------------- +@patch.object(tjs, "get_redis_client") +def test_store_chunked_result_splits_batches(mock_redis): + """Large domain data should be split into multiple chunks.""" + conn = MagicMock() + mock_redis.return_value = conn + + # 12 records, batch size 5 → 3 chunks (5+5+2) + rows = [{"id": i} for i in range(12)] + results = {"history": {"data": rows, "count": 12}} + + original_batch_size = tjs.TRACE_STREAM_BATCH_SIZE + tjs.TRACE_STREAM_BATCH_SIZE = 5 + try: + tjs._store_chunked_result(conn, "job-1", "query_tool", results, None, []) + finally: + tjs.TRACE_STREAM_BATCH_SIZE = original_batch_size + + setex_calls = [c for c in conn.method_calls if c[0] == "setex"] + # 3 chunk keys + 1 result meta = 4 + assert len(setex_calls) == 4 + + # Verify result meta + meta_call = [c for c in setex_calls if ":result:meta" in str(c)] + assert len(meta_call) == 1 + meta = json.loads(meta_call[0][1][2]) + assert meta["domains"]["history"]["chunks"] == 3 + assert meta["domains"]["history"]["total"] == 12 + + # Verify chunks + chunk_calls = [c for c in setex_calls if ":result:history:" in str(c)] + assert len(chunk_calls) == 3 + chunk_0 = json.loads(chunk_calls[0][1][2]) + assert len(chunk_0) == 5 + + +@patch.object(tjs, "get_redis_client") +def test_store_chunked_result_with_aggregation(mock_redis): + """Aggregation should be stored in a separate key.""" + conn = MagicMock() + mock_redis.return_value = conn + + results = {"history": {"data": [{"id": 1}], "count": 1}} + aggregation = {"summary": {"total": 100}} + + tjs._store_chunked_result(conn, "job-1", "mid_section_defect", results, aggregation, []) + + setex_calls = [c for c in conn.method_calls if c[0] == "setex"] + agg_call = [c for c in setex_calls if ":result:aggregation" in str(c)] + assert len(agg_call) == 1 + stored_agg = json.loads(agg_call[0][1][2]) + assert stored_agg["summary"]["total"] == 100 + + +# --------------------------------------------------------------------------- +# stream_job_result_ndjson +# --------------------------------------------------------------------------- +@patch.object(tjs, "get_redis_client") +def test_stream_ndjson_chunked(mock_redis): + """NDJSON stream should yield correct protocol lines for chunked result.""" + result_meta = { + "profile": "query_tool", + "domains": {"history": {"chunks": 2, "total": 3}}, + "failed_domains": [], + } + chunk_0 = [{"id": 1}, {"id": 2}] + chunk_1 = [{"id": 3}] + + conn = MagicMock() + + def _get_side_effect(key): + if key.endswith(":result:meta"): + return json.dumps(result_meta) + if key.endswith(":result:history:0"): + return json.dumps(chunk_0) + if key.endswith(":result:history:1"): + return json.dumps(chunk_1) + if key.endswith(":result:aggregation"): + return None + return None + + conn.get.side_effect = _get_side_effect + mock_redis.return_value = conn + + lines = list(tjs.stream_job_result_ndjson("job-1")) + parsed = [json.loads(line) for line in lines] + + types = [p["type"] for p in parsed] + assert types == ["meta", "domain_start", "records", "records", "domain_end", "complete"] + + assert parsed[0]["domains"] == ["history"] + assert parsed[1]["domain"] == "history" + assert parsed[1]["total"] == 3 + assert parsed[2]["count"] == 2 + assert parsed[3]["count"] == 1 + assert parsed[4]["count"] == 3 + assert parsed[5]["total_records"] == 3 + + +@patch.object(tjs, "get_redis_client") +def test_stream_ndjson_with_aggregation(mock_redis): + """NDJSON stream should include aggregation line when present.""" + result_meta = { + "profile": "mid_section_defect", + "domains": {"upstream_history": {"chunks": 1, "total": 1}}, + "failed_domains": [], + } + conn = MagicMock() + + def _get_side_effect(key): + if key.endswith(":result:meta"): + return json.dumps(result_meta) + if key.endswith(":result:upstream_history:0"): + return json.dumps([{"id": 1}]) + if key.endswith(":result:aggregation"): + return json.dumps({"summary": "ok"}) + return None + + conn.get.side_effect = _get_side_effect + mock_redis.return_value = conn + + lines = list(tjs.stream_job_result_ndjson("job-1")) + parsed = [json.loads(line) for line in lines] + + types = [p["type"] for p in parsed] + assert "aggregation" in types + + agg_line = [p for p in parsed if p["type"] == "aggregation"][0] + assert agg_line["data"]["summary"] == "ok" + + +@patch.object(tjs, "get_redis_client") +def test_stream_ndjson_legacy_fallback(mock_redis): + """NDJSON stream should emit full_result for legacy single-key storage.""" + legacy_result = { + "stage": "events", + "results": {"history": {"data": [{"id": 1}], "count": 1}}, + "aggregation": None, + } + conn = MagicMock() + + def _get_side_effect(key): + if key.endswith(":result:meta"): + return None # no chunked meta + if key.endswith(":result"): + return json.dumps(legacy_result) + return None + + conn.get.side_effect = _get_side_effect + mock_redis.return_value = conn + + lines = list(tjs.stream_job_result_ndjson("job-1")) + parsed = [json.loads(line) for line in lines] + + assert len(parsed) == 1 + assert parsed[0]["type"] == "full_result" + assert parsed[0]["data"]["stage"] == "events" + + +@patch.object(tjs, "get_redis_client") +def test_stream_ndjson_with_failed_domains(mock_redis): + """NDJSON stream should include warning line for partial failures.""" + result_meta = { + "profile": "query_tool", + "domains": {"materials": {"chunks": 1, "total": 1}}, + "failed_domains": ["history"], + } + conn = MagicMock() + + def _get_side_effect(key): + if key.endswith(":result:meta"): + return json.dumps(result_meta) + if key.endswith(":result:materials:0"): + return json.dumps([{"id": 1}]) + if key.endswith(":result:aggregation"): + return None + return None + + conn.get.side_effect = _get_side_effect + mock_redis.return_value = conn + + lines = list(tjs.stream_job_result_ndjson("job-1")) + parsed = [json.loads(line) for line in lines] + + types = [p["type"] for p in parsed] + assert "warning" in types + + warning = [p for p in parsed if p["type"] == "warning"][0] + assert warning["code"] == "EVENTS_PARTIAL_FAILURE" + assert "history" in warning["failed_domains"] + + +# --------------------------------------------------------------------------- +# _flatten_domain_records +# --------------------------------------------------------------------------- +def test_flatten_domain_records(): + events_by_cid = { + "CID-1": [{"CONTAINERID": "CID-1", "EVT": "A"}], + "CID-2": [{"CONTAINERID": "CID-2", "EVT": "B"}, {"CONTAINERID": "CID-2", "EVT": "C"}], + } + rows = tjs._flatten_domain_records(events_by_cid) + assert len(rows) == 3 diff --git a/tests/test_trace_routes.py b/tests/test_trace_routes.py index c105ffe..d0ab255 100644 --- a/tests/test_trace_routes.py +++ b/tests/test_trace_routes.py @@ -482,3 +482,343 @@ def test_non_msd_events_cache_unchanged(mock_cache_set, mock_cache_get, mock_fet assert payload['stage'] == 'events' # EventFetcher should NOT have been called — served from cache mock_fetch_events.assert_not_called() + + +# ---- Admission control tests ---- + + +def test_events_non_msd_cid_limit_returns_413(monkeypatch): + """Non-MSD profile exceeding CID limit should return 413.""" + monkeypatch.setattr( + 'mes_dashboard.routes.trace_routes.TRACE_EVENTS_CID_LIMIT', 5, + ) + + client = _client() + response = client.post( + '/api/trace/events', + json={ + 'profile': 'query_tool', + 'container_ids': [f'CID-{i}' for i in range(10)], + 'domains': ['history'], + }, + ) + + assert response.status_code == 413 + payload = response.get_json() + assert payload['error']['code'] == 'CID_LIMIT_EXCEEDED' + + +@patch('mes_dashboard.routes.trace_routes.build_trace_aggregation_from_events') +@patch('mes_dashboard.routes.trace_routes.EventFetcher.fetch_events') +def test_events_msd_bypasses_cid_limit( + mock_fetch_events, + mock_build_aggregation, + monkeypatch, +): + """MSD profile should bypass CID limit and proceed normally.""" + monkeypatch.setattr( + 'mes_dashboard.routes.trace_routes.TRACE_EVENTS_CID_LIMIT', 5, + ) + mock_fetch_events.return_value = { + f'CID-{i}': [{'CONTAINERID': f'CID-{i}', 'WORKCENTER_GROUP': '測試'}] + for i in range(10) + } + mock_build_aggregation.return_value = { + 'kpi': {'total_input': 10}, + 'charts': {}, + 'daily_trend': [], + 'available_loss_reasons': [], + 'genealogy_status': 'ready', + 'detail_total_count': 0, + } + + client = _client() + response = client.post( + '/api/trace/events', + json={ + 'profile': 'mid_section_defect', + 'container_ids': [f'CID-{i}' for i in range(10)], + 'domains': ['upstream_history'], + 'params': { + 'start_date': '2025-01-01', + 'end_date': '2025-01-31', + }, + 'lineage': {'ancestors': {}}, + 'seed_container_ids': [f'CID-{i}' for i in range(10)], + }, + ) + + assert response.status_code == 200 + payload = response.get_json() + assert payload['stage'] == 'events' + mock_fetch_events.assert_called() + + +@patch('mes_dashboard.routes.trace_routes.EventFetcher.fetch_events') +def test_events_non_msd_within_limit_proceeds(mock_fetch_events): + """Non-MSD profile within CID limit should proceed normally.""" + mock_fetch_events.return_value = { + 'CID-001': [{'CONTAINERID': 'CID-001', 'EVENTTYPE': 'TRACK_IN'}] + } + + client = _client() + response = client.post( + '/api/trace/events', + json={ + 'profile': 'query_tool', + 'container_ids': ['CID-001'], + 'domains': ['history'], + }, + ) + + assert response.status_code == 200 + payload = response.get_json() + assert payload['stage'] == 'events' + + +# ---- Async job queue tests ---- + + +@patch('mes_dashboard.routes.trace_routes.enqueue_trace_events_job') +@patch('mes_dashboard.routes.trace_routes.is_async_available', return_value=True) +def test_events_routes_to_async_above_threshold( + mock_async_avail, + mock_enqueue, + monkeypatch, +): + """CID count > async threshold should return 202 with job_id.""" + monkeypatch.setattr( + 'mes_dashboard.routes.trace_routes.TRACE_ASYNC_CID_THRESHOLD', 5, + ) + mock_enqueue.return_value = ('trace-evt-abc123', None) + + client = _client() + response = client.post( + '/api/trace/events', + json={ + 'profile': 'query_tool', + 'container_ids': [f'CID-{i}' for i in range(10)], + 'domains': ['history'], + }, + ) + + assert response.status_code == 202 + payload = response.get_json() + assert payload['async'] is True + assert payload['job_id'] == 'trace-evt-abc123' + assert '/api/trace/job/trace-evt-abc123' in payload['status_url'] + + +@patch('mes_dashboard.routes.trace_routes.EventFetcher.fetch_events') +@patch('mes_dashboard.routes.trace_routes.is_async_available', return_value=False) +def test_events_falls_back_to_sync_when_async_unavailable( + mock_async_avail, + mock_fetch_events, + monkeypatch, +): + """When async is unavailable, should fall through to sync processing.""" + monkeypatch.setattr( + 'mes_dashboard.routes.trace_routes.TRACE_ASYNC_CID_THRESHOLD', 2, + ) + mock_fetch_events.return_value = { + f'CID-{i}': [{'CONTAINERID': f'CID-{i}'}] + for i in range(3) + } + + client = _client() + response = client.post( + '/api/trace/events', + json={ + 'profile': 'query_tool', + 'container_ids': [f'CID-{i}' for i in range(3)], + 'domains': ['history'], + }, + ) + + assert response.status_code == 200 + payload = response.get_json() + assert payload['stage'] == 'events' + mock_fetch_events.assert_called() + + +@patch('mes_dashboard.routes.trace_routes.enqueue_trace_events_job') +@patch('mes_dashboard.routes.trace_routes.is_async_available', return_value=True) +def test_events_falls_back_to_413_when_enqueue_fails( + mock_async_avail, + mock_enqueue, + monkeypatch, +): + """When enqueue fails for non-MSD, should fall back to 413.""" + monkeypatch.setattr( + 'mes_dashboard.routes.trace_routes.TRACE_ASYNC_CID_THRESHOLD', 3, + ) + monkeypatch.setattr( + 'mes_dashboard.routes.trace_routes.TRACE_EVENTS_CID_LIMIT', 5, + ) + mock_enqueue.return_value = (None, 'Redis down') + + client = _client() + response = client.post( + '/api/trace/events', + json={ + 'profile': 'query_tool', + 'container_ids': [f'CID-{i}' for i in range(10)], + 'domains': ['history'], + }, + ) + + assert response.status_code == 413 + payload = response.get_json() + assert payload['error']['code'] == 'CID_LIMIT_EXCEEDED' + + +# ---- Job status/result endpoint tests ---- + + +@patch('mes_dashboard.routes.trace_routes.get_job_status') +def test_job_status_found(mock_status): + """GET /api/trace/job/ should return status.""" + mock_status.return_value = { + 'job_id': 'trace-evt-abc', + 'status': 'started', + 'profile': 'query_tool', + 'cid_count': 100, + 'domains': ['history'], + 'progress': 'fetching events', + 'created_at': 1740000000.0, + 'elapsed_seconds': 15.0, + 'error': None, + } + + client = _client() + response = client.get('/api/trace/job/trace-evt-abc') + + assert response.status_code == 200 + payload = response.get_json() + assert payload['status'] == 'started' + assert payload['job_id'] == 'trace-evt-abc' + + +@patch('mes_dashboard.routes.trace_routes.get_job_status', return_value=None) +def test_job_status_not_found(mock_status): + """GET /api/trace/job/ should return 404 for unknown job.""" + client = _client() + response = client.get('/api/trace/job/trace-evt-nonexist') + + assert response.status_code == 404 + + +@patch('mes_dashboard.routes.trace_routes.get_job_result') +@patch('mes_dashboard.routes.trace_routes.get_job_status') +def test_job_result_success(mock_status, mock_result): + """GET /api/trace/job//result should return result for finished job.""" + mock_status.return_value = {'status': 'finished', 'job_id': 'j1'} + mock_result.return_value = { + 'stage': 'events', + 'results': {'history': {'data': [], 'count': 0}}, + 'aggregation': None, + } + + client = _client() + response = client.get('/api/trace/job/j1/result') + + assert response.status_code == 200 + payload = response.get_json() + assert payload['stage'] == 'events' + + +@patch('mes_dashboard.routes.trace_routes.get_job_status') +def test_job_result_not_complete(mock_status): + """GET /api/trace/job//result should return 409 for non-finished job.""" + mock_status.return_value = {'status': 'started', 'job_id': 'j2'} + + client = _client() + response = client.get('/api/trace/job/j2/result') + + assert response.status_code == 409 + payload = response.get_json() + assert payload['error']['code'] == 'JOB_NOT_COMPLETE' + + +@patch('mes_dashboard.routes.trace_routes.get_job_result', return_value=None) +@patch('mes_dashboard.routes.trace_routes.get_job_status') +def test_job_result_expired(mock_status, mock_result): + """GET /api/trace/job//result should return 404 if result expired.""" + mock_status.return_value = {'status': 'finished', 'job_id': 'j3'} + + client = _client() + response = client.get('/api/trace/job/j3/result') + + assert response.status_code == 404 + + +# --------------------------------------------------------------------------- +# NDJSON stream endpoint +# --------------------------------------------------------------------------- +@patch("mes_dashboard.routes.trace_routes.stream_job_result_ndjson") +@patch("mes_dashboard.routes.trace_routes.get_job_status") +def test_job_stream_success(mock_status, mock_stream): + """GET /api/trace/job//stream should return NDJSON for finished job.""" + mock_status.return_value = {'status': 'finished', 'job_id': 'j1'} + mock_stream.return_value = iter([ + '{"type":"meta","job_id":"j1","domains":["history"]}\n', + '{"type":"complete","total_records":0}\n', + ]) + + reset_rate_limits_for_tests() + client = _client() + response = client.get('/api/trace/job/j1/stream') + + assert response.status_code == 200 + assert response.content_type.startswith('application/x-ndjson') + + lines = [l for l in response.data.decode().strip().split('\n') if l.strip()] + assert len(lines) == 2 + + +@patch("mes_dashboard.routes.trace_routes.get_job_status") +def test_job_stream_not_found(mock_status): + """GET /api/trace/job//stream should return 404 for missing job.""" + mock_status.return_value = None + + reset_rate_limits_for_tests() + client = _client() + response = client.get('/api/trace/job/j-missing/stream') + + assert response.status_code == 404 + + +@patch("mes_dashboard.routes.trace_routes.get_job_status") +def test_job_stream_not_complete(mock_status): + """GET /api/trace/job//stream should return 409 for incomplete job.""" + mock_status.return_value = {'status': 'started', 'job_id': 'j2'} + + reset_rate_limits_for_tests() + client = _client() + response = client.get('/api/trace/job/j2/stream') + + assert response.status_code == 409 + + +# --------------------------------------------------------------------------- +# 202 response includes stream_url +# --------------------------------------------------------------------------- +@patch("mes_dashboard.routes.trace_routes.is_async_available", return_value=True) +@patch("mes_dashboard.routes.trace_routes.enqueue_trace_events_job") +def test_events_async_response_includes_stream_url(mock_enqueue, mock_async): + """Events 202 response should include stream_url field.""" + mock_enqueue.return_value = ("trace-evt-xyz", None) + + reset_rate_limits_for_tests() + client = _client() + cids = [f"CID-{i}" for i in range(25000)] + response = client.post('/api/trace/events', json={ + 'profile': 'query_tool', + 'container_ids': cids, + 'domains': ['history'], + }) + + assert response.status_code == 202 + data = response.get_json() + assert data["stream_url"] == "/api/trace/job/trace-evt-xyz/stream" + assert data["status_url"] == "/api/trace/job/trace-evt-xyz"