""" End-to-end tests for dual-track document processing. These tests require: - Running backend server - Valid user credentials - Sample files in demo_docs/ Run with: pytest backend/tests/e2e/ -v -s """ import pytest import requests import time import os from pathlib import Path from typing import Optional # Configuration _default_backend_port = os.getenv("BACKEND_PORT", "8000") _default_base_url = f"http://localhost:{_default_backend_port}" _api_base = os.getenv("TOOL_OCR_E2E_API_BASE_URL", _default_base_url).rstrip("/") API_BASE_URL = f"{_api_base}/api/v2" DEMO_DOCS_PATH = Path( os.getenv("TOOL_OCR_DEMO_DOCS_DIR") or (Path(__file__).resolve().parents[3] / "demo_docs") ) # Test credentials must be provided via environment variables TEST_USERNAME = os.getenv("TOOL_OCR_E2E_USERNAME") TEST_PASSWORD = os.getenv("TOOL_OCR_E2E_PASSWORD") class TestDualTrackE2E: """End-to-end tests for dual-track processing.""" @pytest.fixture(scope="class") def auth_token(self): """Authenticate and get access token.""" if not TEST_USERNAME or not TEST_PASSWORD: pytest.skip("Set TOOL_OCR_E2E_USERNAME and TOOL_OCR_E2E_PASSWORD to run E2E tests") response = requests.post( f"{API_BASE_URL}/auth/login", json={ "username": TEST_USERNAME, "password": TEST_PASSWORD } ) if response.status_code != 200: pytest.skip(f"Authentication failed: {response.text}") data = response.json() return data["access_token"] @pytest.fixture def headers(self, auth_token): """Get authorization headers.""" return {"Authorization": f"Bearer {auth_token}"} def wait_for_task_completion( self, task_id: str, headers: dict, timeout: int = 120, poll_interval: int = 2 ) -> dict: """Wait for task to complete or fail.""" start_time = time.time() while time.time() - start_time < timeout: response = requests.get( f"{API_BASE_URL}/tasks/{task_id}", headers=headers ) if response.status_code != 200: raise Exception(f"Failed to get task status: {response.text}") task = response.json() status = task.get("status") if status == "completed": return task elif status == "failed": raise Exception(f"Task failed: {task.get('error_message')}") time.sleep(poll_interval) raise TimeoutError(f"Task {task_id} did not complete within {timeout} seconds") def upload_and_process( self, file_path: Path, headers: dict, force_track: Optional[str] = None ) -> dict: """Upload file and start processing.""" # Upload file with open(file_path, "rb") as f: files = {"file": (file_path.name, f)} response = requests.post( f"{API_BASE_URL}/upload", files=files, headers=headers ) if response.status_code != 200: raise Exception(f"Upload failed: {response.text}") upload_result = response.json() task_id = upload_result["task_id"] # Start processing params = {"use_dual_track": True} if force_track: params["force_track"] = force_track response = requests.post( f"{API_BASE_URL}/tasks/{task_id}/start", headers=headers, params=params ) if response.status_code != 200: raise Exception(f"Start processing failed: {response.text}") return {"task_id": task_id, "upload_result": upload_result} # ===== Test: Editable PDF (Direct Track) ===== def test_editable_pdf_direct_track(self, headers): """Test processing editable PDF through direct track.""" file_path = DEMO_DOCS_PATH / "edit.pdf" if not file_path.exists(): pytest.skip(f"Test file not found: {file_path}") # Upload and process result = self.upload_and_process(file_path, headers) task_id = result["task_id"] print(f"\nProcessing editable PDF: {file_path.name}") print(f"Task ID: {task_id}") # Wait for completion task = self.wait_for_task_completion(task_id, headers) # Verify results assert task["status"] == "completed" assert task.get("processing_track") in ["direct", "ocr"] # Should be direct # Get processing metadata response = requests.get( f"{API_BASE_URL}/tasks/{task_id}/metadata", headers=headers ) if response.status_code == 200: metadata = response.json() print(f"Processing Track: {metadata.get('processing_track')}") print(f"Processing Time: {metadata.get('processing_time_seconds', 0):.2f}s") print(f"Page Count: {metadata.get('page_count')}") print(f"Total Elements: {metadata.get('total_elements')}") # Editable PDF should use direct track # Note: This may vary based on document characteristics assert metadata.get("unified_format") == True print(f"[PASS] Editable PDF processed successfully") # ===== Test: Scanned PDF (OCR Track) ===== def test_scanned_pdf_ocr_track(self, headers): """Test processing scanned PDF through OCR track.""" file_path = DEMO_DOCS_PATH / "scan.pdf" if not file_path.exists(): pytest.skip(f"Test file not found: {file_path}") # Upload and process result = self.upload_and_process(file_path, headers) task_id = result["task_id"] print(f"\nProcessing scanned PDF: {file_path.name}") print(f"Task ID: {task_id}") # Wait for completion (OCR may take longer) task = self.wait_for_task_completion(task_id, headers, timeout=180) # Verify results assert task["status"] == "completed" # Get processing metadata response = requests.get( f"{API_BASE_URL}/tasks/{task_id}/metadata", headers=headers ) if response.status_code == 200: metadata = response.json() print(f"Processing Track: {metadata.get('processing_track')}") print(f"Processing Time: {metadata.get('processing_time_seconds', 0):.2f}s") print(f"Page Count: {metadata.get('page_count')}") print(f"Total Text Regions: {metadata.get('total_text_regions')}") print(f"Total Tables: {metadata.get('total_tables')}") print(f"Total Images: {metadata.get('total_images')}") # Scanned PDF should use OCR track assert metadata.get("processing_track") == "ocr" assert metadata.get("unified_format") == True print(f"[PASS] Scanned PDF processed successfully") # ===== Test: Image Files (OCR Track) ===== @pytest.mark.parametrize("image_file", ["img1.png", "img2.png", "img3.png"]) def test_image_ocr_track(self, headers, image_file): """Test processing image files through OCR track.""" file_path = DEMO_DOCS_PATH / image_file if not file_path.exists(): pytest.skip(f"Test file not found: {file_path}") # Upload and process result = self.upload_and_process(file_path, headers) task_id = result["task_id"] print(f"\nProcessing image: {file_path.name}") print(f"Task ID: {task_id}") # Wait for completion task = self.wait_for_task_completion(task_id, headers, timeout=120) # Verify results assert task["status"] == "completed" # Get processing metadata response = requests.get( f"{API_BASE_URL}/tasks/{task_id}/metadata", headers=headers ) if response.status_code == 200: metadata = response.json() print(f"Processing Track: {metadata.get('processing_track')}") print(f"Processing Time: {metadata.get('processing_time_seconds', 0):.2f}s") # Images should use OCR track assert metadata.get("processing_track") == "ocr" print(f"[PASS] Image {image_file} processed successfully") # ===== Test: Office Document (Direct Track) ===== def test_office_document_direct_track(self, headers): """Test processing Office document (PowerPoint).""" file_path = DEMO_DOCS_PATH / "ppt.pptx" if not file_path.exists(): pytest.skip(f"Test file not found: {file_path}") # Upload and process result = self.upload_and_process(file_path, headers) task_id = result["task_id"] print(f"\nProcessing Office document: {file_path.name}") print(f"Task ID: {task_id}") # Wait for completion (large Office file needs longer timeout) task = self.wait_for_task_completion(task_id, headers, timeout=300) # Verify results assert task["status"] == "completed" # Get processing metadata response = requests.get( f"{API_BASE_URL}/tasks/{task_id}/metadata", headers=headers ) if response.status_code == 200: metadata = response.json() print(f"Processing Track: {metadata.get('processing_track')}") print(f"Processing Time: {metadata.get('processing_time_seconds', 0):.2f}s") print(f"Page Count: {metadata.get('page_count')}") # Office documents should use direct track # Note: Current implementation may still use OCR assert metadata.get("unified_format") == True print(f"[PASS] Office document processed successfully") class TestDocumentAnalysis: """Test document analysis endpoint.""" @pytest.fixture(scope="class") def auth_token(self): """Authenticate and get access token.""" response = requests.post( f"{API_BASE_URL}/auth/login", json={ "username": TEST_USERNAME, "password": TEST_PASSWORD } ) if response.status_code != 200: pytest.skip(f"Authentication failed: {response.text}") return response.json()["access_token"] @pytest.fixture def headers(self, auth_token): """Get authorization headers.""" return {"Authorization": f"Bearer {auth_token}"} def test_analyze_editable_pdf(self, headers): """Test document analysis for editable PDF.""" file_path = DEMO_DOCS_PATH / "edit.pdf" if not file_path.exists(): pytest.skip(f"Test file not found: {file_path}") # Upload file with open(file_path, "rb") as f: files = {"file": (file_path.name, f)} response = requests.post( f"{API_BASE_URL}/upload", files=files, headers=headers ) if response.status_code != 200: pytest.fail(f"Upload failed: {response.text}") task_id = response.json()["task_id"] # Analyze document (POST method) response = requests.post( f"{API_BASE_URL}/tasks/{task_id}/analyze", headers=headers ) if response.status_code != 200: pytest.fail(f"Analysis failed: {response.text}") analysis = response.json() print(f"\nDocument Analysis for: {file_path.name}") print(f"Recommended Track: {analysis.get('recommended_track')}") print(f"Confidence: {analysis.get('confidence')}") print(f"Reason: {analysis.get('reason')}") print(f"Is Editable: {analysis.get('is_editable')}") # Editable PDF should recommend direct track assert analysis.get("recommended_track") == "direct" assert analysis.get("is_editable") == True assert analysis.get("confidence") >= 0.8 def test_analyze_scanned_pdf(self, headers): """Test document analysis for scanned PDF.""" file_path = DEMO_DOCS_PATH / "scan.pdf" if not file_path.exists(): pytest.skip(f"Test file not found: {file_path}") # Upload file with open(file_path, "rb") as f: files = {"file": (file_path.name, f)} response = requests.post( f"{API_BASE_URL}/upload", files=files, headers=headers ) if response.status_code != 200: pytest.fail(f"Upload failed: {response.text}") task_id = response.json()["task_id"] # Analyze document (POST method) response = requests.post( f"{API_BASE_URL}/tasks/{task_id}/analyze", headers=headers ) if response.status_code != 200: pytest.fail(f"Analysis failed: {response.text}") analysis = response.json() print(f"\nDocument Analysis for: {file_path.name}") print(f"Recommended Track: {analysis.get('recommended_track')}") print(f"Confidence: {analysis.get('confidence')}") print(f"Reason: {analysis.get('reason')}") print(f"Is Editable: {analysis.get('is_editable')}") # Scanned PDF should recommend OCR track assert analysis.get("recommended_track") == "ocr" assert analysis.get("is_editable") == False class TestExportFormats: """Test export functionality for processed documents.""" @pytest.fixture(scope="class") def auth_token(self): """Authenticate and get access token.""" response = requests.post( f"{API_BASE_URL}/auth/login", json={ "username": TEST_USERNAME, "password": TEST_PASSWORD } ) if response.status_code != 200: pytest.skip(f"Authentication failed: {response.text}") return response.json()["access_token"] @pytest.fixture def headers(self, auth_token): """Get authorization headers.""" return {"Authorization": f"Bearer {auth_token}"} @pytest.fixture(scope="class") def processed_task_id(self, auth_token): """Get a completed task for export testing.""" headers = {"Authorization": f"Bearer {auth_token}"} # Upload and process a simple file file_path = DEMO_DOCS_PATH / "edit.pdf" if not file_path.exists(): pytest.skip(f"Test file not found: {file_path}") with open(file_path, "rb") as f: files = {"file": (file_path.name, f)} response = requests.post( f"{API_BASE_URL}/upload", files=files, headers=headers ) if response.status_code != 200: pytest.skip(f"Upload failed: {response.text}") task_id = response.json()["task_id"] # Start processing response = requests.post( f"{API_BASE_URL}/tasks/{task_id}/start", headers=headers, params={"use_dual_track": True} ) if response.status_code != 200: pytest.skip(f"Start processing failed: {response.text}") # Wait for completion start_time = time.time() while time.time() - start_time < 120: response = requests.get( f"{API_BASE_URL}/tasks/{task_id}", headers=headers ) if response.status_code == 200: task = response.json() if task.get("status") == "completed": return task_id elif task.get("status") == "failed": pytest.skip(f"Task failed: {task.get('error_message')}") time.sleep(2) pytest.skip("Task did not complete in time") def test_download_json(self, headers, processed_task_id): """Test downloading JSON export.""" response = requests.get( f"{API_BASE_URL}/tasks/{processed_task_id}/download/json", headers=headers ) assert response.status_code == 200 assert "application/json" in response.headers.get("Content-Type", "") # Verify it's valid JSON data = response.json() assert data is not None print(f"\n[PASS] JSON export successful") def test_download_markdown(self, headers, processed_task_id): """Test downloading Markdown export.""" response = requests.get( f"{API_BASE_URL}/tasks/{processed_task_id}/download/markdown", headers=headers ) assert response.status_code == 200 content = response.text assert len(content) > 0 print(f"\n[PASS] Markdown export successful ({len(content)} chars)") def test_download_pdf(self, headers, processed_task_id): """Test downloading PDF export.""" response = requests.get( f"{API_BASE_URL}/tasks/{processed_task_id}/download/pdf", headers=headers ) assert response.status_code == 200 assert "application/pdf" in response.headers.get("Content-Type", "") # Check PDF magic bytes assert response.content[:4] == b"%PDF" print(f"\n[PASS] PDF export successful ({len(response.content)} bytes)") def test_download_unified(self, headers, processed_task_id): """Test downloading UnifiedDocument JSON export.""" response = requests.get( f"{API_BASE_URL}/tasks/{processed_task_id}/download/unified", headers=headers ) assert response.status_code == 200 # Verify UnifiedDocument structure data = response.json() assert "document_id" in data assert "metadata" in data assert "pages" in data print(f"\n[PASS] UnifiedDocument export successful") print(f" - Document ID: {data.get('document_id')}") print(f" - Pages: {len(data.get('pages', []))}") class TestForceTrack: """Test forcing specific processing track.""" @pytest.fixture(scope="class") def auth_token(self): """Authenticate and get access token.""" response = requests.post( f"{API_BASE_URL}/auth/login", json={ "username": TEST_USERNAME, "password": TEST_PASSWORD } ) if response.status_code != 200: pytest.skip(f"Authentication failed: {response.text}") return response.json()["access_token"] @pytest.fixture def headers(self, auth_token): """Get authorization headers.""" return {"Authorization": f"Bearer {auth_token}"} def wait_for_task(self, task_id, headers, timeout=120): """Wait for task completion.""" start_time = time.time() while time.time() - start_time < timeout: response = requests.get( f"{API_BASE_URL}/tasks/{task_id}", headers=headers ) if response.status_code == 200: task = response.json() if task.get("status") in ["completed", "failed"]: return task time.sleep(2) return None def test_force_ocr_on_editable_pdf(self, headers): """Test forcing OCR track on editable PDF.""" file_path = DEMO_DOCS_PATH / "edit.pdf" if not file_path.exists(): pytest.skip(f"Test file not found: {file_path}") # Upload file with open(file_path, "rb") as f: files = {"file": (file_path.name, f)} response = requests.post( f"{API_BASE_URL}/upload", files=files, headers=headers ) task_id = response.json()["task_id"] # Force OCR track response = requests.post( f"{API_BASE_URL}/tasks/{task_id}/start", headers=headers, params={"use_dual_track": True, "force_track": "ocr"} ) assert response.status_code == 200 print(f"\nForcing OCR track on editable PDF") print(f"Task ID: {task_id}") # Wait for completion task = self.wait_for_task(task_id, headers, timeout=180) assert task is not None assert task.get("status") == "completed" # Verify OCR track was used response = requests.get( f"{API_BASE_URL}/tasks/{task_id}/metadata", headers=headers ) if response.status_code == 200: metadata = response.json() print(f"Processing Track: {metadata.get('processing_track')}") assert metadata.get("processing_track") == "ocr" print(f"[PASS] Force OCR track successful") def test_force_direct_on_scanned_pdf(self, headers): """Test forcing direct track on scanned PDF (should still work but with poor results).""" file_path = DEMO_DOCS_PATH / "scan.pdf" if not file_path.exists(): pytest.skip(f"Test file not found: {file_path}") # Upload file with open(file_path, "rb") as f: files = {"file": (file_path.name, f)} response = requests.post( f"{API_BASE_URL}/upload", files=files, headers=headers ) task_id = response.json()["task_id"] # Force direct track response = requests.post( f"{API_BASE_URL}/tasks/{task_id}/start", headers=headers, params={"use_dual_track": True, "force_track": "direct"} ) assert response.status_code == 200 print(f"\nForcing direct track on scanned PDF") print(f"Task ID: {task_id}") # Wait for completion task = self.wait_for_task(task_id, headers, timeout=120) assert task is not None # May complete or fail (scanned PDF has no extractable text) if task.get("status") == "completed": response = requests.get( f"{API_BASE_URL}/tasks/{task_id}/metadata", headers=headers ) if response.status_code == 200: metadata = response.json() print(f"Processing Track: {metadata.get('processing_track')}") # Should be direct as forced assert metadata.get("processing_track") == "direct" print(f"[PASS] Force direct track test complete") if __name__ == "__main__": pytest.main([__file__, "-v", "-s"])