#!/usr/bin/env python3 """ Test script for Office document processing """ import json import requests from pathlib import Path import time API_BASE = "http://localhost:12010/api/v1" USERNAME = "admin" PASSWORD = "admin123" def login(): """Login and get JWT token""" print("Step 1: Logging in...") response = requests.post( f"{API_BASE}/auth/login", json={"username": USERNAME, "password": PASSWORD} ) response.raise_for_status() data = response.json() token = data["access_token"] print(f"✓ Login successful. Token expires in: {data['expires_in']} seconds ({data['expires_in']//3600} hours)") return token def upload_file(token, file_path): """Upload file and create batch""" print(f"\nStep 2: Uploading file: {file_path.name}...") with open(file_path, 'rb') as f: files = {'files': (file_path.name, f, 'application/vnd.openxmlformats-officedocument.wordprocessingml.document')} response = requests.post( f"{API_BASE}/upload", headers={"Authorization": f"Bearer {token}"}, files=files, data={"batch_name": "Office Document Test"} ) response.raise_for_status() result = response.json() print(f"✓ File uploaded and batch created:") print(f" Batch ID: {result['id']}") print(f" Total files: {result['total_files']}") print(f" Status: {result['status']}") return result['id'] def trigger_ocr(token, batch_id): """Trigger OCR processing""" print(f"\nStep 3: Triggering OCR processing...") response = requests.post( f"{API_BASE}/ocr/process", headers={"Authorization": f"Bearer {token}"}, json={ "batch_id": batch_id, "lang": "ch", "detect_layout": True } ) response.raise_for_status() result = response.json() print(f"✓ OCR processing started") print(f" Message: {result['message']}") print(f" Total files: {result['total_files']}") def check_status(token, batch_id): """Check processing status""" print(f"\nStep 4: Checking processing status...") max_wait = 120 # 120 seconds max waited = 0 while waited < max_wait: response = requests.get( f"{API_BASE}/batch/{batch_id}/status", headers={"Authorization": f"Bearer {token}"} ) response.raise_for_status() data = response.json() batch_status = data['batch']['status'] progress = data['batch']['progress_percentage'] file_status = data['files'][0]['status'] print(f" Batch status: {batch_status}, Progress: {progress}%, File status: {file_status}") if batch_status == 'completed': print(f"\n✓ Processing completed!") file_data = data['files'][0] if 'processing_time' in file_data: print(f" Processing time: {file_data['processing_time']:.2f} seconds") return data elif batch_status == 'failed': print(f"\n✗ Processing failed!") print(f" Error: {data['files'][0].get('error_message', 'Unknown error')}") return data time.sleep(5) waited += 5 print(f"\n⚠ Timeout waiting for processing (waited {waited}s)") return None def get_result(token, file_id): """Get OCR result""" print(f"\nStep 5: Getting OCR result...") response = requests.get( f"{API_BASE}/ocr/result/{file_id}", headers={"Authorization": f"Bearer {token}"} ) response.raise_for_status() data = response.json() file_info = data['file'] result = data.get('result') print(f"✓ OCR Result retrieved:") print(f" File: {file_info['original_filename']}") print(f" Status: {file_info['status']}") if result: print(f" Language: {result.get('detected_language', 'N/A')}") print(f" Total text regions: {result.get('total_text_regions', 0)}") print(f" Average confidence: {result.get('average_confidence', 0):.2%}") # Read markdown file if available if result.get('markdown_path'): try: with open(result['markdown_path'], 'r', encoding='utf-8') as f: markdown_content = f.read() print(f"\n Markdown preview (first 300 chars):") print(f" {'-'*60}") print(f" {markdown_content[:300]}...") print(f" {'-'*60}") except Exception as e: print(f" Could not read markdown file: {e}") else: print(f" No OCR result available yet") return data def main(): try: # Test file test_file = Path('/Users/egg/Projects/Tool_OCR/demo_docs/office_tests/test_document.docx') if not test_file.exists(): print(f"✗ Test file not found: {test_file}") return print("="*70) print("Office Document Processing Test") print("="*70) print(f"Test file: {test_file.name} ({test_file.stat().st_size} bytes)") print("="*70) # Run test token = login() batch_id = upload_file(token, test_file) trigger_ocr(token, batch_id) status_data = check_status(token, batch_id) if status_data and status_data['batch']['status'] == 'completed': file_id = status_data['files'][0]['id'] result = get_result(token, file_id) print("\n" + "="*70) print("✓ TEST PASSED: Office document processing successful!") print("="*70) else: print("\n" + "="*70) print("✗ TEST FAILED: Processing did not complete successfully") print("="*70) except Exception as e: print(f"\n✗ TEST ERROR: {str(e)}") import traceback traceback.print_exc() if __name__ == "__main__": main()