Files
DashBoard/tests/e2e/test_trace_pipeline_e2e.py
egg c6f982ae50 test: fix E2E/integration failures, add trace pipeline E2E, remove dead migration tests
- Fix 6 Playwright strict-mode violations in query tool E2E (v-show dual-tab selectors)
- Update 5 resource history E2E tests for POST /query API restructure
- Add 22 trace pipeline E2E tests: admission control, async job queue, NDJSON streaming
- Fix 3 health endpoint tests: add circuit breaker + route cache mocks
- Fix WIP integration tests: load .env before DB module import for --run-integration
- Remove 4 dead migration test files (20 permanently-skipped tests)

Final: 1101 unit + 10 integration + 121 E2E + 23 stress = 1255 passed, 0 failed

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 08:33:07 +08:00

729 lines
28 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""E2E tests for trace pipeline: memory triage, async job queue, NDJSON streaming.
Tests the three core features implemented in the trace pipeline proposals:
1. Memory triage — admission control, CID limits, MSD bypass
2. Async job queue — RQ-based async routing, job lifecycle
3. NDJSON streaming — chunked Redis storage, streaming protocol
Run with: pytest tests/e2e/test_trace_pipeline_e2e.py -v --run-e2e
"""
import json
import os
import time
import uuid
import pytest
import redis
import requests
pytestmark = [pytest.mark.e2e]
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
REDIS_KEY_PREFIX = os.getenv("REDIS_KEY_PREFIX", "mes_wip")
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _post_events(base_url, profile, container_ids, domains=None, timeout=60):
payload = {"profile": profile, "container_ids": container_ids}
if domains:
payload["domains"] = domains
return requests.post(
f"{base_url}/api/trace/events", json=payload, timeout=timeout,
)
def _resolve_cids(base_url, work_order):
"""Resolve real container IDs from a work order via live API."""
resp = requests.post(
f"{base_url}/api/query-tool/resolve",
json={"input_type": "work_order", "values": [work_order]},
timeout=30,
)
if resp.status_code != 200:
return []
data = resp.json()
lots = data.get("data", [])
return [
str(lot.get("container_id") or lot.get("CONTAINERID") or "")
for lot in lots
if lot.get("container_id") or lot.get("CONTAINERID")
]
def _get_redis():
"""Get a direct Redis client for seeding test data."""
return redis.from_url(REDIS_URL, decode_responses=True)
def _key(suffix):
return f"{REDIS_KEY_PREFIX}:{suffix}"
def _seed_completed_job(r, job_id, profile, domain_data, aggregation=None,
failed_domains=None, batch_size=3):
"""Seed Redis with a completed job's chunked result for streaming tests.
Args:
r: Redis client
job_id: Job identifier
profile: Profile name
domain_data: dict of {domain_name: [list of record dicts]}
aggregation: optional aggregation dict
failed_domains: list of failed domain names
batch_size: records per chunk (small for testing)
"""
ttl = 300 # 5 min TTL for test data
# Job meta (hash)
meta_key = _key(f"trace:job:{job_id}:meta")
r.hset(meta_key, mapping={
"profile": profile,
"cid_count": "100",
"domains": ",".join(domain_data.keys()),
"status": "finished",
"progress": "done",
"created_at": str(time.time() - 10),
"completed_at": str(time.time()),
"error": "",
})
r.expire(meta_key, ttl)
# Chunked result storage
domain_info = {}
for domain_name, rows in domain_data.items():
chunks = [
rows[i:i + batch_size]
for i in range(0, max(len(rows), 1), batch_size)
] if rows else []
for idx, chunk in enumerate(chunks):
chunk_key = _key(f"trace:job:{job_id}:result:{domain_name}:{idx}")
r.setex(chunk_key, ttl, json.dumps(chunk))
domain_info[domain_name] = {"chunks": len(chunks), "total": len(rows)}
# Aggregation
if aggregation is not None:
agg_key = _key(f"trace:job:{job_id}:result:aggregation")
r.setex(agg_key, ttl, json.dumps(aggregation))
# Result meta
result_meta = {
"profile": profile,
"domains": domain_info,
"failed_domains": sorted(failed_domains) if failed_domains else [],
}
result_meta_key = _key(f"trace:job:{job_id}:result:meta")
r.setex(result_meta_key, ttl, json.dumps(result_meta))
def _cleanup_job(r, job_id):
"""Remove all Redis keys for a test job."""
pattern = _key(f"trace:job:{job_id}:*")
keys = list(r.scan_iter(pattern))
if keys:
r.delete(*keys)
def _parse_ndjson(response_text):
"""Parse NDJSON response text into list of dicts."""
lines = []
for line in response_text.strip().split("\n"):
line = line.strip()
if line:
lines.append(json.loads(line))
return lines
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(scope="module")
def base(app_server):
return app_server
@pytest.fixture(scope="module")
def real_cids(base):
"""Resolve real CIDs from a known work order."""
cids = _resolve_cids(base, "GA26010001")
if not cids:
pytest.skip("No container IDs resolved — cannot test trace pipeline")
return cids
@pytest.fixture(scope="module")
def rclient():
"""Direct Redis client for seeding/cleanup."""
r = _get_redis()
try:
r.ping()
except redis.ConnectionError:
pytest.skip("Redis not available")
return r
# ===========================================================================
# 1. Memory Triage — Admission Control
# ===========================================================================
class TestTraceAdmissionControl:
"""Verify admission control: CID limits, profile bypass, validation."""
def test_sync_response_with_small_cid_set(self, base, real_cids):
"""Small CID count → sync 200 response with actual trace data."""
small_cids = real_cids[:3]
resp = _post_events(base, "query_tool", small_cids, domains=["history"])
assert resp.status_code == 200, f"Expected 200, got {resp.status_code}: {resp.text[:200]}"
data = resp.json()
assert data["stage"] == "events"
assert "results" in data
assert "history" in data["results"]
history = data["results"]["history"]
assert "data" in history
assert "count" in history
assert isinstance(history["data"], list)
# With real CIDs we should get actual history records
assert history["count"] >= 0
def test_sync_response_data_structure_complete(self, base, real_cids):
"""Sync response has proper domain data structure with count/data keys."""
resp = _post_events(base, "query_tool", real_cids[:5],
domains=["history", "materials"])
assert resp.status_code == 200
data = resp.json()
for domain in ["history", "materials"]:
assert domain in data["results"], f"Missing domain '{domain}'"
d = data["results"][domain]
assert "data" in d, f"Domain '{domain}' missing 'data'"
assert "count" in d, f"Domain '{domain}' missing 'count'"
assert d["count"] == len(d["data"])
def test_cid_limit_exceeded_non_msd_returns_413_or_202(self, base):
"""Non-MSD profile with > CID_LIMIT → 413 (no async) or 202 (async)."""
cid_limit = int(os.getenv("TRACE_EVENTS_CID_LIMIT", "50000"))
# Generate fake CIDs that exceed the limit
fake_cids = [f"FAKE-{i:06x}" for i in range(cid_limit + 1)]
resp = _post_events(base, "query_tool", fake_cids, domains=["history"])
assert resp.status_code in (413, 202), (
f"Expected 413 or 202 for {cid_limit + 1} CIDs, got {resp.status_code}"
)
if resp.status_code == 413:
err = resp.json()["error"]
assert err["code"] == "CID_LIMIT_EXCEEDED"
assert str(cid_limit) in err["message"]
def test_msd_profile_bypasses_cid_limit(self, base):
"""MSD profile must NOT return 413 even with > CID_LIMIT CIDs.
MSD requires all CIDs for accurate aggregation — no hard cutoff.
With async available, large MSD queries should route to 202.
Without async, they proceed to sync (may be slow but not rejected).
"""
async_threshold = int(os.getenv("TRACE_ASYNC_CID_THRESHOLD", "20000"))
# Use just above async threshold — enough to trigger async routing
# but below CID_LIMIT to keep test fast
fake_cids = [f"MSD-{i:06x}" for i in range(async_threshold + 1)]
resp = _post_events(base, "mid_section_defect", fake_cids,
domains=["rejects"])
# Must NOT be 413 — MSD bypasses CID limit
assert resp.status_code != 413, (
"MSD profile should NEVER receive 413 CID_LIMIT_EXCEEDED"
)
# Should be 202 (async) or 200 (sync fallback)
assert resp.status_code in (200, 202)
def test_empty_container_ids_rejected(self, base):
"""Empty container_ids list → 400 INVALID_PARAMS."""
resp = _post_events(base, "query_tool", [])
assert resp.status_code == 400
assert resp.json()["error"]["code"] == "INVALID_PARAMS"
def test_missing_profile_rejected(self, base):
"""Missing profile field → 400."""
resp = requests.post(
f"{base}/api/trace/events",
json={"container_ids": ["CID-001"]},
timeout=10,
)
assert resp.status_code == 400
def test_invalid_domain_rejected(self, base):
"""Invalid domain name → 400 INVALID_PARAMS."""
resp = _post_events(
base, "query_tool", ["CID-001"], domains=["nonexistent_domain"],
)
assert resp.status_code == 400
assert "INVALID_PARAMS" in resp.json()["error"]["code"]
# ===========================================================================
# 2. Async Job Queue
# ===========================================================================
class TestTraceAsyncJobQueue:
"""Verify async job routing, lifecycle, and result retrieval."""
def test_async_routing_returns_202_with_correct_format(self, base):
"""Large CID count + async available → 202 with job metadata."""
threshold = int(os.getenv("TRACE_ASYNC_CID_THRESHOLD", "20000"))
fake_cids = [f"ASYNC-{i:06x}" for i in range(threshold + 1)]
resp = _post_events(base, "query_tool", fake_cids, domains=["history"])
if resp.status_code != 202:
pytest.skip("Async not available (RQ worker not running)")
data = resp.json()
assert data["async"] is True
assert data["stage"] == "events"
assert "job_id" in data
assert data["job_id"].startswith("trace-evt-")
assert "status_url" in data
assert "stream_url" in data
assert data["status_url"] == f"/api/trace/job/{data['job_id']}"
assert data["stream_url"] == f"/api/trace/job/{data['job_id']}/stream"
def test_job_status_after_enqueue(self, base):
"""After async enqueue, job status should be queryable."""
threshold = int(os.getenv("TRACE_ASYNC_CID_THRESHOLD", "20000"))
fake_cids = [f"STATUS-{i:06x}" for i in range(threshold + 1)]
enqueue_resp = _post_events(base, "query_tool", fake_cids,
domains=["history"])
if enqueue_resp.status_code != 202:
pytest.skip("Async not available")
job_id = enqueue_resp.json()["job_id"]
status_resp = requests.get(
f"{base}/api/trace/job/{job_id}", timeout=10,
)
assert status_resp.status_code == 200
status = status_resp.json()
assert status["job_id"] == job_id
assert status["status"] in ("queued", "started", "finished", "failed")
assert status["profile"] == "query_tool"
assert status["cid_count"] == threshold + 1
assert "history" in status["domains"]
assert "elapsed_seconds" in status
def test_job_lifecycle_poll_until_terminal(self, base):
"""Full lifecycle: enqueue → poll until finished/failed → verify result."""
threshold = int(os.getenv("TRACE_ASYNC_CID_THRESHOLD", "20000"))
fake_cids = [f"LIFE-{i:06x}" for i in range(threshold + 1)]
enqueue_resp = _post_events(base, "query_tool", fake_cids,
domains=["history"])
if enqueue_resp.status_code != 202:
pytest.skip("Async not available")
job_id = enqueue_resp.json()["job_id"]
status_url = f"{base}/api/trace/job/{job_id}"
# Poll until terminal state (max 120s — fake CIDs will fail fast)
terminal = False
final_status = None
deadline = time.time() + 120
while time.time() < deadline:
resp = requests.get(status_url, timeout=10)
assert resp.status_code == 200
final_status = resp.json()
if final_status["status"] in ("finished", "failed"):
terminal = True
break
time.sleep(2)
assert terminal, f"Job did not reach terminal state within 120s, last: {final_status}"
# Job with fake CIDs may finish with empty results or fail —
# either is acceptable. Key is that the lifecycle completed.
if final_status["status"] == "finished":
# Result should be retrievable
result_resp = requests.get(
f"{base}/api/trace/job/{job_id}/result", timeout=10,
)
assert result_resp.status_code == 200
result = result_resp.json()
assert result["stage"] == "events"
assert "results" in result
def test_job_not_found_returns_404(self, base):
"""Non-existent job → 404."""
resp = requests.get(
f"{base}/api/trace/job/trace-evt-nonexistent99", timeout=10,
)
assert resp.status_code == 404
assert resp.json()["error"]["code"] == "JOB_NOT_FOUND"
def test_job_result_not_found_returns_404(self, base):
"""Non-existent job result → 404."""
resp = requests.get(
f"{base}/api/trace/job/trace-evt-nonexistent99/result", timeout=10,
)
assert resp.status_code == 404
def test_job_result_before_completion_returns_409(self, base, rclient):
"""Result request for an in-progress job → 409."""
job_id = f"trace-evt-inprogress{uuid.uuid4().hex[:6]}"
meta_key = _key(f"trace:job:{job_id}:meta")
rclient.hset(meta_key, mapping={
"profile": "query_tool",
"cid_count": "100",
"domains": "history",
"status": "started",
"progress": "fetching",
"created_at": str(time.time()),
"completed_at": "",
"error": "",
})
rclient.expire(meta_key, 60)
try:
resp = requests.get(
f"{base}/api/trace/job/{job_id}/result", timeout=10,
)
assert resp.status_code == 409
assert resp.json()["error"]["code"] == "JOB_NOT_COMPLETE"
finally:
rclient.delete(meta_key)
# ===========================================================================
# 3. NDJSON Streaming
# ===========================================================================
class TestTraceNDJSONStream:
"""Verify NDJSON streaming endpoint and protocol."""
def test_stream_not_found_returns_404(self, base):
"""Stream for non-existent job → 404."""
resp = requests.get(
f"{base}/api/trace/job/trace-evt-nonexistent99/stream", timeout=10,
)
assert resp.status_code == 404
def test_stream_before_completion_returns_409(self, base, rclient):
"""Stream request for an in-progress job → 409."""
job_id = f"trace-evt-stream409{uuid.uuid4().hex[:6]}"
meta_key = _key(f"trace:job:{job_id}:meta")
rclient.hset(meta_key, mapping={
"profile": "query_tool",
"cid_count": "50",
"domains": "history",
"status": "started",
"progress": "fetching",
"created_at": str(time.time()),
"completed_at": "",
"error": "",
})
rclient.expire(meta_key, 60)
try:
resp = requests.get(
f"{base}/api/trace/job/{job_id}/stream", timeout=10,
)
assert resp.status_code == 409
data = resp.json()
assert data["error"]["code"] == "JOB_NOT_COMPLETE"
assert data["status"] == "started"
finally:
rclient.delete(meta_key)
def test_stream_protocol_single_domain(self, base, rclient):
"""Stream a completed job with one domain — verify full NDJSON protocol."""
job_id = f"trace-evt-stream1d{uuid.uuid4().hex[:6]}"
records = [{"CID": f"C{i}", "EVENT": f"ev{i}", "TS": "2026-01-01"}
for i in range(7)]
_seed_completed_job(rclient, job_id, "query_tool",
{"history": records}, batch_size=3)
try:
resp = requests.get(
f"{base}/api/trace/job/{job_id}/stream", timeout=10,
)
assert resp.status_code == 200
assert "application/x-ndjson" in resp.headers["Content-Type"]
assert resp.headers.get("Cache-Control") == "no-cache"
lines = _parse_ndjson(resp.text)
types = [ln["type"] for ln in lines]
# Protocol: meta → domain_start → records(×3) → domain_end → complete
assert types[0] == "meta"
assert types[1] == "domain_start"
assert types[-2] == "domain_end"
assert types[-1] == "complete"
# Verify meta line
meta = lines[0]
assert meta["job_id"] == job_id
assert meta["profile"] == "query_tool"
assert "history" in meta["domains"]
# Verify domain_start
ds = lines[1]
assert ds["domain"] == "history"
assert ds["total"] == 7
# Verify records batches (7 records / batch_size=3 → 3 chunks)
record_lines = [ln for ln in lines if ln["type"] == "records"]
assert len(record_lines) == 3 # ceil(7/3) = 3 chunks
total_streamed = sum(ln["count"] for ln in record_lines)
assert total_streamed == 7
# Verify batch indices
batches = [ln["batch"] for ln in record_lines]
assert batches == [0, 1, 2]
# Verify actual data content
all_records = []
for ln in record_lines:
assert ln["domain"] == "history"
all_records.extend(ln["data"])
assert len(all_records) == 7
assert all_records[0]["CID"] == "C0"
assert all_records[6]["CID"] == "C6"
# Verify domain_end count matches
de = [ln for ln in lines if ln["type"] == "domain_end"][0]
assert de["count"] == 7
# Verify complete
complete = lines[-1]
assert complete["total_records"] == 7
finally:
_cleanup_job(rclient, job_id)
def test_stream_protocol_multi_domain(self, base, rclient):
"""Stream a completed job with multiple domains."""
job_id = f"trace-evt-streammd{uuid.uuid4().hex[:6]}"
history = [{"CID": f"H{i}", "EVENT": "hist"} for i in range(5)]
materials = [{"CID": f"M{i}", "MAT": "mat"} for i in range(4)]
rejects = [{"CID": f"R{i}", "REJ": "rej"} for i in range(2)]
_seed_completed_job(rclient, job_id, "query_tool", {
"history": history,
"materials": materials,
"rejects": rejects,
}, batch_size=3)
try:
resp = requests.get(
f"{base}/api/trace/job/{job_id}/stream", timeout=10,
)
assert resp.status_code == 200
lines = _parse_ndjson(resp.text)
types = [ln["type"] for ln in lines]
# Must start with meta and end with complete
assert types[0] == "meta"
assert types[-1] == "complete"
assert set(lines[0]["domains"]) == {"history", "materials", "rejects"}
# Each domain must have domain_start → records → domain_end sequence
for domain_name, expected_total in [("history", 5), ("materials", 4), ("rejects", 2)]:
starts = [ln for ln in lines if ln["type"] == "domain_start" and ln["domain"] == domain_name]
ends = [ln for ln in lines if ln["type"] == "domain_end" and ln["domain"] == domain_name]
recs = [ln for ln in lines if ln["type"] == "records" and ln["domain"] == domain_name]
assert len(starts) == 1, f"Expected 1 domain_start for {domain_name}"
assert len(ends) == 1, f"Expected 1 domain_end for {domain_name}"
assert starts[0]["total"] == expected_total
assert ends[0]["count"] == expected_total
assert sum(ln["count"] for ln in recs) == expected_total
# Total records across all domains
complete = lines[-1]
assert complete["total_records"] == 11 # 5 + 4 + 2
finally:
_cleanup_job(rclient, job_id)
def test_stream_with_aggregation(self, base, rclient):
"""Stream includes aggregation line for MSD profile."""
job_id = f"trace-evt-streamagg{uuid.uuid4().hex[:6]}"
rejects = [{"CID": f"R{i}", "DEFECT": "scratch"} for i in range(4)]
aggregation = {
"total_defects": 42,
"by_category": {"scratch": 30, "crack": 12},
}
_seed_completed_job(rclient, job_id, "mid_section_defect",
{"rejects": rejects}, aggregation=aggregation,
batch_size=5)
try:
resp = requests.get(
f"{base}/api/trace/job/{job_id}/stream", timeout=10,
)
assert resp.status_code == 200
lines = _parse_ndjson(resp.text)
types = [ln["type"] for ln in lines]
assert "aggregation" in types
agg_line = [ln for ln in lines if ln["type"] == "aggregation"][0]
assert agg_line["data"]["total_defects"] == 42
assert agg_line["data"]["by_category"]["scratch"] == 30
# aggregation must come after domain_end and before complete
agg_idx = types.index("aggregation")
complete_idx = types.index("complete")
last_domain_end_idx = max(
i for i, t in enumerate(types) if t == "domain_end"
)
assert last_domain_end_idx < agg_idx < complete_idx
finally:
_cleanup_job(rclient, job_id)
def test_stream_with_failed_domains(self, base, rclient):
"""Stream includes warning line when some domains failed."""
job_id = f"trace-evt-streamfail{uuid.uuid4().hex[:6]}"
history = [{"CID": "C1", "EVENT": "ev1"}]
_seed_completed_job(rclient, job_id, "query_tool",
{"history": history},
failed_domains=["materials", "rejects"],
batch_size=5)
try:
resp = requests.get(
f"{base}/api/trace/job/{job_id}/stream", timeout=10,
)
assert resp.status_code == 200
lines = _parse_ndjson(resp.text)
types = [ln["type"] for ln in lines]
assert "warning" in types
warning = [ln for ln in lines if ln["type"] == "warning"][0]
assert warning["code"] == "EVENTS_PARTIAL_FAILURE"
assert set(warning["failed_domains"]) == {"materials", "rejects"}
finally:
_cleanup_job(rclient, job_id)
def test_stream_empty_domain(self, base, rclient):
"""Stream handles domain with zero records gracefully."""
job_id = f"trace-evt-streamempty{uuid.uuid4().hex[:6]}"
_seed_completed_job(rclient, job_id, "query_tool",
{"history": []}, batch_size=5)
try:
resp = requests.get(
f"{base}/api/trace/job/{job_id}/stream", timeout=10,
)
assert resp.status_code == 200
lines = _parse_ndjson(resp.text)
types = [ln["type"] for ln in lines]
assert "domain_start" in types
assert "domain_end" in types
ds = [ln for ln in lines if ln["type"] == "domain_start"][0]
de = [ln for ln in lines if ln["type"] == "domain_end"][0]
assert ds["total"] == 0
assert de["count"] == 0
complete = lines[-1]
assert complete["total_records"] == 0
finally:
_cleanup_job(rclient, job_id)
def test_stream_content_matches_result_endpoint(self, base, rclient):
"""Stream data must match what GET /result returns."""
job_id = f"trace-evt-streammatch{uuid.uuid4().hex[:6]}"
records = [{"CID": f"C{i}", "VAL": i * 10} for i in range(8)]
_seed_completed_job(rclient, job_id, "query_tool",
{"history": records}, batch_size=3)
try:
# Get via result endpoint
result_resp = requests.get(
f"{base}/api/trace/job/{job_id}/result", timeout=10,
)
assert result_resp.status_code == 200
result_data = result_resp.json()
# Get via stream endpoint
stream_resp = requests.get(
f"{base}/api/trace/job/{job_id}/stream", timeout=10,
)
assert stream_resp.status_code == 200
lines = _parse_ndjson(stream_resp.text)
# Collect all streamed records
streamed_records = []
for ln in lines:
if ln["type"] == "records" and ln["domain"] == "history":
streamed_records.extend(ln["data"])
# Compare counts
result_history = result_data["results"]["history"]
assert len(streamed_records) == result_history["count"]
# Compare actual data content
assert streamed_records == result_history["data"]
finally:
_cleanup_job(rclient, job_id)
# ===========================================================================
# 4. Full Async → Stream End-to-End
# ===========================================================================
class TestTraceAsyncToStream:
"""Full end-to-end: POST events → async 202 → poll → stream NDJSON."""
def test_full_async_lifecycle_with_stream(self, base, real_cids):
"""Complete flow: real CIDs → async → poll → stream → verify data.
Uses real CIDs but requires TRACE_ASYNC_CID_THRESHOLD to be low enough
or enough CIDs. If async not triggered, test sync+seed stream instead.
"""
threshold = int(os.getenv("TRACE_ASYNC_CID_THRESHOLD", "20000"))
if len(real_cids) <= threshold:
# Not enough CIDs to trigger async — test sync path instead
# and verify stream works for the seeded result
resp = _post_events(base, "query_tool", real_cids[:10],
domains=["history"])
assert resp.status_code == 200
data = resp.json()
assert data["stage"] == "events"
assert "history" in data["results"]
# Sync path proven — stream is tested in TestTraceNDJSONStream
return
# If we have enough CIDs, test full async lifecycle
resp = _post_events(base, "query_tool", real_cids, domains=["history"])
assert resp.status_code == 202
job_id = resp.json()["job_id"]
# Poll until finished
deadline = time.time() + 180
final_status = None
while time.time() < deadline:
status_resp = requests.get(
f"{base}/api/trace/job/{job_id}", timeout=10,
)
final_status = status_resp.json()
if final_status["status"] in ("finished", "failed"):
break
time.sleep(2)
assert final_status["status"] == "finished", (
f"Job did not finish: {final_status}"
)
# Stream the result
stream_resp = requests.get(
f"{base}/api/trace/job/{job_id}/stream", timeout=30,
)
assert stream_resp.status_code == 200
lines = _parse_ndjson(stream_resp.text)
# Verify protocol integrity
assert lines[0]["type"] == "meta"
assert lines[-1]["type"] == "complete"
assert lines[-1]["total_records"] > 0