- Add debug_font_path, demo_docs_dir, e2e_api_base_url to config.py - Fix hardcoded paths in pp_structure_debug.py, create_demo_images.py - Fix hardcoded paths in test files - Update .env.example with new configuration options - Update .gitignore to exclude AI development files (.claude/, openspec/, AGENTS.md, CLAUDE.md) - Add production startup script (start-prod.sh) - Add README.md with project documentation - Add 1panel Docker deployment files (docker-compose.yml, Dockerfiles, nginx.conf) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
689 lines
22 KiB
Python
689 lines
22 KiB
Python
"""
|
|
End-to-end tests for dual-track document processing.
|
|
|
|
These tests require:
|
|
- Running backend server
|
|
- Valid user credentials
|
|
- Sample files in demo_docs/
|
|
|
|
Run with: pytest backend/tests/e2e/ -v -s
|
|
"""
|
|
|
|
import pytest
|
|
import requests
|
|
import time
|
|
import os
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
# Configuration
|
|
_default_backend_port = os.getenv("BACKEND_PORT", "8000")
|
|
_default_base_url = f"http://localhost:{_default_backend_port}"
|
|
_api_base = os.getenv("TOOL_OCR_E2E_API_BASE_URL", _default_base_url).rstrip("/")
|
|
API_BASE_URL = f"{_api_base}/api/v2"
|
|
DEMO_DOCS_PATH = Path(
|
|
os.getenv("TOOL_OCR_DEMO_DOCS_DIR")
|
|
or (Path(__file__).resolve().parents[3] / "demo_docs")
|
|
)
|
|
|
|
# Test credentials must be provided via environment variables
|
|
TEST_USERNAME = os.getenv("TOOL_OCR_E2E_USERNAME")
|
|
TEST_PASSWORD = os.getenv("TOOL_OCR_E2E_PASSWORD")
|
|
|
|
|
|
class TestDualTrackE2E:
|
|
"""End-to-end tests for dual-track processing."""
|
|
|
|
@pytest.fixture(scope="class")
|
|
def auth_token(self):
|
|
"""Authenticate and get access token."""
|
|
if not TEST_USERNAME or not TEST_PASSWORD:
|
|
pytest.skip("Set TOOL_OCR_E2E_USERNAME and TOOL_OCR_E2E_PASSWORD to run E2E tests")
|
|
|
|
response = requests.post(
|
|
f"{API_BASE_URL}/auth/login",
|
|
json={
|
|
"username": TEST_USERNAME,
|
|
"password": TEST_PASSWORD
|
|
}
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
pytest.skip(f"Authentication failed: {response.text}")
|
|
|
|
data = response.json()
|
|
return data["access_token"]
|
|
|
|
@pytest.fixture
|
|
def headers(self, auth_token):
|
|
"""Get authorization headers."""
|
|
return {"Authorization": f"Bearer {auth_token}"}
|
|
|
|
def wait_for_task_completion(
|
|
self,
|
|
task_id: str,
|
|
headers: dict,
|
|
timeout: int = 120,
|
|
poll_interval: int = 2
|
|
) -> dict:
|
|
"""Wait for task to complete or fail."""
|
|
start_time = time.time()
|
|
|
|
while time.time() - start_time < timeout:
|
|
response = requests.get(
|
|
f"{API_BASE_URL}/tasks/{task_id}",
|
|
headers=headers
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
raise Exception(f"Failed to get task status: {response.text}")
|
|
|
|
task = response.json()
|
|
status = task.get("status")
|
|
|
|
if status == "completed":
|
|
return task
|
|
elif status == "failed":
|
|
raise Exception(f"Task failed: {task.get('error_message')}")
|
|
|
|
time.sleep(poll_interval)
|
|
|
|
raise TimeoutError(f"Task {task_id} did not complete within {timeout} seconds")
|
|
|
|
def upload_and_process(
|
|
self,
|
|
file_path: Path,
|
|
headers: dict,
|
|
force_track: Optional[str] = None
|
|
) -> dict:
|
|
"""Upload file and start processing."""
|
|
# Upload file
|
|
with open(file_path, "rb") as f:
|
|
files = {"file": (file_path.name, f)}
|
|
response = requests.post(
|
|
f"{API_BASE_URL}/upload",
|
|
files=files,
|
|
headers=headers
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
raise Exception(f"Upload failed: {response.text}")
|
|
|
|
upload_result = response.json()
|
|
task_id = upload_result["task_id"]
|
|
|
|
# Start processing
|
|
params = {"use_dual_track": True}
|
|
if force_track:
|
|
params["force_track"] = force_track
|
|
|
|
response = requests.post(
|
|
f"{API_BASE_URL}/tasks/{task_id}/start",
|
|
headers=headers,
|
|
params=params
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
raise Exception(f"Start processing failed: {response.text}")
|
|
|
|
return {"task_id": task_id, "upload_result": upload_result}
|
|
|
|
# ===== Test: Editable PDF (Direct Track) =====
|
|
|
|
def test_editable_pdf_direct_track(self, headers):
|
|
"""Test processing editable PDF through direct track."""
|
|
file_path = DEMO_DOCS_PATH / "edit.pdf"
|
|
|
|
if not file_path.exists():
|
|
pytest.skip(f"Test file not found: {file_path}")
|
|
|
|
# Upload and process
|
|
result = self.upload_and_process(file_path, headers)
|
|
task_id = result["task_id"]
|
|
|
|
print(f"\nProcessing editable PDF: {file_path.name}")
|
|
print(f"Task ID: {task_id}")
|
|
|
|
# Wait for completion
|
|
task = self.wait_for_task_completion(task_id, headers)
|
|
|
|
# Verify results
|
|
assert task["status"] == "completed"
|
|
assert task.get("processing_track") in ["direct", "ocr"] # Should be direct
|
|
|
|
# Get processing metadata
|
|
response = requests.get(
|
|
f"{API_BASE_URL}/tasks/{task_id}/metadata",
|
|
headers=headers
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
metadata = response.json()
|
|
print(f"Processing Track: {metadata.get('processing_track')}")
|
|
print(f"Processing Time: {metadata.get('processing_time_seconds', 0):.2f}s")
|
|
print(f"Page Count: {metadata.get('page_count')}")
|
|
print(f"Total Elements: {metadata.get('total_elements')}")
|
|
|
|
# Editable PDF should use direct track
|
|
# Note: This may vary based on document characteristics
|
|
assert metadata.get("unified_format") == True
|
|
|
|
print(f"[PASS] Editable PDF processed successfully")
|
|
|
|
# ===== Test: Scanned PDF (OCR Track) =====
|
|
|
|
def test_scanned_pdf_ocr_track(self, headers):
|
|
"""Test processing scanned PDF through OCR track."""
|
|
file_path = DEMO_DOCS_PATH / "scan.pdf"
|
|
|
|
if not file_path.exists():
|
|
pytest.skip(f"Test file not found: {file_path}")
|
|
|
|
# Upload and process
|
|
result = self.upload_and_process(file_path, headers)
|
|
task_id = result["task_id"]
|
|
|
|
print(f"\nProcessing scanned PDF: {file_path.name}")
|
|
print(f"Task ID: {task_id}")
|
|
|
|
# Wait for completion (OCR may take longer)
|
|
task = self.wait_for_task_completion(task_id, headers, timeout=180)
|
|
|
|
# Verify results
|
|
assert task["status"] == "completed"
|
|
|
|
# Get processing metadata
|
|
response = requests.get(
|
|
f"{API_BASE_URL}/tasks/{task_id}/metadata",
|
|
headers=headers
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
metadata = response.json()
|
|
print(f"Processing Track: {metadata.get('processing_track')}")
|
|
print(f"Processing Time: {metadata.get('processing_time_seconds', 0):.2f}s")
|
|
print(f"Page Count: {metadata.get('page_count')}")
|
|
print(f"Total Text Regions: {metadata.get('total_text_regions')}")
|
|
print(f"Total Tables: {metadata.get('total_tables')}")
|
|
print(f"Total Images: {metadata.get('total_images')}")
|
|
|
|
# Scanned PDF should use OCR track
|
|
assert metadata.get("processing_track") == "ocr"
|
|
assert metadata.get("unified_format") == True
|
|
|
|
print(f"[PASS] Scanned PDF processed successfully")
|
|
|
|
# ===== Test: Image Files (OCR Track) =====
|
|
|
|
@pytest.mark.parametrize("image_file", ["img1.png", "img2.png", "img3.png"])
|
|
def test_image_ocr_track(self, headers, image_file):
|
|
"""Test processing image files through OCR track."""
|
|
file_path = DEMO_DOCS_PATH / image_file
|
|
|
|
if not file_path.exists():
|
|
pytest.skip(f"Test file not found: {file_path}")
|
|
|
|
# Upload and process
|
|
result = self.upload_and_process(file_path, headers)
|
|
task_id = result["task_id"]
|
|
|
|
print(f"\nProcessing image: {file_path.name}")
|
|
print(f"Task ID: {task_id}")
|
|
|
|
# Wait for completion
|
|
task = self.wait_for_task_completion(task_id, headers, timeout=120)
|
|
|
|
# Verify results
|
|
assert task["status"] == "completed"
|
|
|
|
# Get processing metadata
|
|
response = requests.get(
|
|
f"{API_BASE_URL}/tasks/{task_id}/metadata",
|
|
headers=headers
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
metadata = response.json()
|
|
print(f"Processing Track: {metadata.get('processing_track')}")
|
|
print(f"Processing Time: {metadata.get('processing_time_seconds', 0):.2f}s")
|
|
|
|
# Images should use OCR track
|
|
assert metadata.get("processing_track") == "ocr"
|
|
|
|
print(f"[PASS] Image {image_file} processed successfully")
|
|
|
|
# ===== Test: Office Document (Direct Track) =====
|
|
|
|
def test_office_document_direct_track(self, headers):
|
|
"""Test processing Office document (PowerPoint)."""
|
|
file_path = DEMO_DOCS_PATH / "ppt.pptx"
|
|
|
|
if not file_path.exists():
|
|
pytest.skip(f"Test file not found: {file_path}")
|
|
|
|
# Upload and process
|
|
result = self.upload_and_process(file_path, headers)
|
|
task_id = result["task_id"]
|
|
|
|
print(f"\nProcessing Office document: {file_path.name}")
|
|
print(f"Task ID: {task_id}")
|
|
|
|
# Wait for completion (large Office file needs longer timeout)
|
|
task = self.wait_for_task_completion(task_id, headers, timeout=300)
|
|
|
|
# Verify results
|
|
assert task["status"] == "completed"
|
|
|
|
# Get processing metadata
|
|
response = requests.get(
|
|
f"{API_BASE_URL}/tasks/{task_id}/metadata",
|
|
headers=headers
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
metadata = response.json()
|
|
print(f"Processing Track: {metadata.get('processing_track')}")
|
|
print(f"Processing Time: {metadata.get('processing_time_seconds', 0):.2f}s")
|
|
print(f"Page Count: {metadata.get('page_count')}")
|
|
|
|
# Office documents should use direct track
|
|
# Note: Current implementation may still use OCR
|
|
assert metadata.get("unified_format") == True
|
|
|
|
print(f"[PASS] Office document processed successfully")
|
|
|
|
|
|
class TestDocumentAnalysis:
|
|
"""Test document analysis endpoint."""
|
|
|
|
@pytest.fixture(scope="class")
|
|
def auth_token(self):
|
|
"""Authenticate and get access token."""
|
|
response = requests.post(
|
|
f"{API_BASE_URL}/auth/login",
|
|
json={
|
|
"username": TEST_USERNAME,
|
|
"password": TEST_PASSWORD
|
|
}
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
pytest.skip(f"Authentication failed: {response.text}")
|
|
|
|
return response.json()["access_token"]
|
|
|
|
@pytest.fixture
|
|
def headers(self, auth_token):
|
|
"""Get authorization headers."""
|
|
return {"Authorization": f"Bearer {auth_token}"}
|
|
|
|
def test_analyze_editable_pdf(self, headers):
|
|
"""Test document analysis for editable PDF."""
|
|
file_path = DEMO_DOCS_PATH / "edit.pdf"
|
|
|
|
if not file_path.exists():
|
|
pytest.skip(f"Test file not found: {file_path}")
|
|
|
|
# Upload file
|
|
with open(file_path, "rb") as f:
|
|
files = {"file": (file_path.name, f)}
|
|
response = requests.post(
|
|
f"{API_BASE_URL}/upload",
|
|
files=files,
|
|
headers=headers
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
pytest.fail(f"Upload failed: {response.text}")
|
|
|
|
task_id = response.json()["task_id"]
|
|
|
|
# Analyze document (POST method)
|
|
response = requests.post(
|
|
f"{API_BASE_URL}/tasks/{task_id}/analyze",
|
|
headers=headers
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
pytest.fail(f"Analysis failed: {response.text}")
|
|
|
|
analysis = response.json()
|
|
|
|
print(f"\nDocument Analysis for: {file_path.name}")
|
|
print(f"Recommended Track: {analysis.get('recommended_track')}")
|
|
print(f"Confidence: {analysis.get('confidence')}")
|
|
print(f"Reason: {analysis.get('reason')}")
|
|
print(f"Is Editable: {analysis.get('is_editable')}")
|
|
|
|
# Editable PDF should recommend direct track
|
|
assert analysis.get("recommended_track") == "direct"
|
|
assert analysis.get("is_editable") == True
|
|
assert analysis.get("confidence") >= 0.8
|
|
|
|
def test_analyze_scanned_pdf(self, headers):
|
|
"""Test document analysis for scanned PDF."""
|
|
file_path = DEMO_DOCS_PATH / "scan.pdf"
|
|
|
|
if not file_path.exists():
|
|
pytest.skip(f"Test file not found: {file_path}")
|
|
|
|
# Upload file
|
|
with open(file_path, "rb") as f:
|
|
files = {"file": (file_path.name, f)}
|
|
response = requests.post(
|
|
f"{API_BASE_URL}/upload",
|
|
files=files,
|
|
headers=headers
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
pytest.fail(f"Upload failed: {response.text}")
|
|
|
|
task_id = response.json()["task_id"]
|
|
|
|
# Analyze document (POST method)
|
|
response = requests.post(
|
|
f"{API_BASE_URL}/tasks/{task_id}/analyze",
|
|
headers=headers
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
pytest.fail(f"Analysis failed: {response.text}")
|
|
|
|
analysis = response.json()
|
|
|
|
print(f"\nDocument Analysis for: {file_path.name}")
|
|
print(f"Recommended Track: {analysis.get('recommended_track')}")
|
|
print(f"Confidence: {analysis.get('confidence')}")
|
|
print(f"Reason: {analysis.get('reason')}")
|
|
print(f"Is Editable: {analysis.get('is_editable')}")
|
|
|
|
# Scanned PDF should recommend OCR track
|
|
assert analysis.get("recommended_track") == "ocr"
|
|
assert analysis.get("is_editable") == False
|
|
|
|
|
|
class TestExportFormats:
|
|
"""Test export functionality for processed documents."""
|
|
|
|
@pytest.fixture(scope="class")
|
|
def auth_token(self):
|
|
"""Authenticate and get access token."""
|
|
response = requests.post(
|
|
f"{API_BASE_URL}/auth/login",
|
|
json={
|
|
"username": TEST_USERNAME,
|
|
"password": TEST_PASSWORD
|
|
}
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
pytest.skip(f"Authentication failed: {response.text}")
|
|
|
|
return response.json()["access_token"]
|
|
|
|
@pytest.fixture
|
|
def headers(self, auth_token):
|
|
"""Get authorization headers."""
|
|
return {"Authorization": f"Bearer {auth_token}"}
|
|
|
|
@pytest.fixture(scope="class")
|
|
def processed_task_id(self, auth_token):
|
|
"""Get a completed task for export testing."""
|
|
headers = {"Authorization": f"Bearer {auth_token}"}
|
|
|
|
# Upload and process a simple file
|
|
file_path = DEMO_DOCS_PATH / "edit.pdf"
|
|
|
|
if not file_path.exists():
|
|
pytest.skip(f"Test file not found: {file_path}")
|
|
|
|
with open(file_path, "rb") as f:
|
|
files = {"file": (file_path.name, f)}
|
|
response = requests.post(
|
|
f"{API_BASE_URL}/upload",
|
|
files=files,
|
|
headers=headers
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
pytest.skip(f"Upload failed: {response.text}")
|
|
|
|
task_id = response.json()["task_id"]
|
|
|
|
# Start processing
|
|
response = requests.post(
|
|
f"{API_BASE_URL}/tasks/{task_id}/start",
|
|
headers=headers,
|
|
params={"use_dual_track": True}
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
pytest.skip(f"Start processing failed: {response.text}")
|
|
|
|
# Wait for completion
|
|
start_time = time.time()
|
|
while time.time() - start_time < 120:
|
|
response = requests.get(
|
|
f"{API_BASE_URL}/tasks/{task_id}",
|
|
headers=headers
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
task = response.json()
|
|
if task.get("status") == "completed":
|
|
return task_id
|
|
elif task.get("status") == "failed":
|
|
pytest.skip(f"Task failed: {task.get('error_message')}")
|
|
|
|
time.sleep(2)
|
|
|
|
pytest.skip("Task did not complete in time")
|
|
|
|
def test_download_json(self, headers, processed_task_id):
|
|
"""Test downloading JSON export."""
|
|
response = requests.get(
|
|
f"{API_BASE_URL}/tasks/{processed_task_id}/download/json",
|
|
headers=headers
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
assert "application/json" in response.headers.get("Content-Type", "")
|
|
|
|
# Verify it's valid JSON
|
|
data = response.json()
|
|
assert data is not None
|
|
|
|
print(f"\n[PASS] JSON export successful")
|
|
|
|
def test_download_markdown(self, headers, processed_task_id):
|
|
"""Test downloading Markdown export."""
|
|
response = requests.get(
|
|
f"{API_BASE_URL}/tasks/{processed_task_id}/download/markdown",
|
|
headers=headers
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
|
|
content = response.text
|
|
assert len(content) > 0
|
|
|
|
print(f"\n[PASS] Markdown export successful ({len(content)} chars)")
|
|
|
|
def test_download_pdf(self, headers, processed_task_id):
|
|
"""Test downloading PDF export."""
|
|
response = requests.get(
|
|
f"{API_BASE_URL}/tasks/{processed_task_id}/download/pdf",
|
|
headers=headers
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
assert "application/pdf" in response.headers.get("Content-Type", "")
|
|
|
|
# Check PDF magic bytes
|
|
assert response.content[:4] == b"%PDF"
|
|
|
|
print(f"\n[PASS] PDF export successful ({len(response.content)} bytes)")
|
|
|
|
def test_download_unified(self, headers, processed_task_id):
|
|
"""Test downloading UnifiedDocument JSON export."""
|
|
response = requests.get(
|
|
f"{API_BASE_URL}/tasks/{processed_task_id}/download/unified",
|
|
headers=headers
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
|
|
# Verify UnifiedDocument structure
|
|
data = response.json()
|
|
assert "document_id" in data
|
|
assert "metadata" in data
|
|
assert "pages" in data
|
|
|
|
print(f"\n[PASS] UnifiedDocument export successful")
|
|
print(f" - Document ID: {data.get('document_id')}")
|
|
print(f" - Pages: {len(data.get('pages', []))}")
|
|
|
|
|
|
class TestForceTrack:
|
|
"""Test forcing specific processing track."""
|
|
|
|
@pytest.fixture(scope="class")
|
|
def auth_token(self):
|
|
"""Authenticate and get access token."""
|
|
response = requests.post(
|
|
f"{API_BASE_URL}/auth/login",
|
|
json={
|
|
"username": TEST_USERNAME,
|
|
"password": TEST_PASSWORD
|
|
}
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
pytest.skip(f"Authentication failed: {response.text}")
|
|
|
|
return response.json()["access_token"]
|
|
|
|
@pytest.fixture
|
|
def headers(self, auth_token):
|
|
"""Get authorization headers."""
|
|
return {"Authorization": f"Bearer {auth_token}"}
|
|
|
|
def wait_for_task(self, task_id, headers, timeout=120):
|
|
"""Wait for task completion."""
|
|
start_time = time.time()
|
|
while time.time() - start_time < timeout:
|
|
response = requests.get(
|
|
f"{API_BASE_URL}/tasks/{task_id}",
|
|
headers=headers
|
|
)
|
|
if response.status_code == 200:
|
|
task = response.json()
|
|
if task.get("status") in ["completed", "failed"]:
|
|
return task
|
|
time.sleep(2)
|
|
return None
|
|
|
|
def test_force_ocr_on_editable_pdf(self, headers):
|
|
"""Test forcing OCR track on editable PDF."""
|
|
file_path = DEMO_DOCS_PATH / "edit.pdf"
|
|
|
|
if not file_path.exists():
|
|
pytest.skip(f"Test file not found: {file_path}")
|
|
|
|
# Upload file
|
|
with open(file_path, "rb") as f:
|
|
files = {"file": (file_path.name, f)}
|
|
response = requests.post(
|
|
f"{API_BASE_URL}/upload",
|
|
files=files,
|
|
headers=headers
|
|
)
|
|
|
|
task_id = response.json()["task_id"]
|
|
|
|
# Force OCR track
|
|
response = requests.post(
|
|
f"{API_BASE_URL}/tasks/{task_id}/start",
|
|
headers=headers,
|
|
params={"use_dual_track": True, "force_track": "ocr"}
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
|
|
print(f"\nForcing OCR track on editable PDF")
|
|
print(f"Task ID: {task_id}")
|
|
|
|
# Wait for completion
|
|
task = self.wait_for_task(task_id, headers, timeout=180)
|
|
|
|
assert task is not None
|
|
assert task.get("status") == "completed"
|
|
|
|
# Verify OCR track was used
|
|
response = requests.get(
|
|
f"{API_BASE_URL}/tasks/{task_id}/metadata",
|
|
headers=headers
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
metadata = response.json()
|
|
print(f"Processing Track: {metadata.get('processing_track')}")
|
|
assert metadata.get("processing_track") == "ocr"
|
|
|
|
print(f"[PASS] Force OCR track successful")
|
|
|
|
def test_force_direct_on_scanned_pdf(self, headers):
|
|
"""Test forcing direct track on scanned PDF (should still work but with poor results)."""
|
|
file_path = DEMO_DOCS_PATH / "scan.pdf"
|
|
|
|
if not file_path.exists():
|
|
pytest.skip(f"Test file not found: {file_path}")
|
|
|
|
# Upload file
|
|
with open(file_path, "rb") as f:
|
|
files = {"file": (file_path.name, f)}
|
|
response = requests.post(
|
|
f"{API_BASE_URL}/upload",
|
|
files=files,
|
|
headers=headers
|
|
)
|
|
|
|
task_id = response.json()["task_id"]
|
|
|
|
# Force direct track
|
|
response = requests.post(
|
|
f"{API_BASE_URL}/tasks/{task_id}/start",
|
|
headers=headers,
|
|
params={"use_dual_track": True, "force_track": "direct"}
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
|
|
print(f"\nForcing direct track on scanned PDF")
|
|
print(f"Task ID: {task_id}")
|
|
|
|
# Wait for completion
|
|
task = self.wait_for_task(task_id, headers, timeout=120)
|
|
|
|
assert task is not None
|
|
# May complete or fail (scanned PDF has no extractable text)
|
|
|
|
if task.get("status") == "completed":
|
|
response = requests.get(
|
|
f"{API_BASE_URL}/tasks/{task_id}/metadata",
|
|
headers=headers
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
metadata = response.json()
|
|
print(f"Processing Track: {metadata.get('processing_track')}")
|
|
# Should be direct as forced
|
|
assert metadata.get("processing_track") == "direct"
|
|
|
|
print(f"[PASS] Force direct track test complete")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
pytest.main([__file__, "-v", "-s"])
|