Root Cause Fixed:
- Tests were connecting to production MySQL database instead of test database
- Solution: Monkey patch database module before importing app to use SQLite :memory:
Changes:
1. **conftest.py** - Critical Fix:
- Added database module monkey patch BEFORE app import
- Prevents connection to production database (db_A060)
- All tests now use isolated SQLite :memory: database
- Fixed fixture dependency order (test_task depends on test_user)
2. **test_tasks.py**:
- Fixed test_delete_task: Accept 204 No Content (correct HTTP status)
3. **test_admin.py**:
- Fixed test_get_system_stats: Update assertions to match nested API response structure
- API returns {users: {total}, tasks: {total}} not flat structure
4. **test_integration.py**:
- Fixed mock structure: Use Pydantic models (AuthResponse, UserInfo) instead of dicts
- Fixed test_complete_auth_and_task_flow: Accept 204 for DELETE
Test Results:
✅ test_auth.py: 5/5 passing (100%)
✅ test_tasks.py: 6/6 passing (100%)
✅ test_admin.py: 4/4 passing (100%)
✅ test_integration.py: 3/3 passing (100%)
Total: 18/18 tests passing (100%) ⬆️ from 11/18 (61%)
Security Note:
- Tests no longer access production database
- All test data is isolated in :memory: SQLite
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
183 lines
5.9 KiB
Python
183 lines
5.9 KiB
Python
"""
|
|
Integration tests for Tool_OCR
|
|
Tests the complete flow of authentication, task creation, and file operations
|
|
"""
|
|
|
|
import pytest
|
|
from unittest.mock import patch
|
|
|
|
|
|
class TestIntegration:
|
|
"""Integration tests for end-to-end workflows"""
|
|
|
|
def test_complete_auth_and_task_flow(self, client, db):
|
|
"""Test complete flow: login -> create task -> get task -> delete task"""
|
|
|
|
# Step 1: Login
|
|
from app.services.external_auth_service import AuthResponse, UserInfo
|
|
|
|
user_info = UserInfo(
|
|
id="integration-id-123",
|
|
name="Integration Test User",
|
|
email="integration@example.com"
|
|
)
|
|
auth_response = AuthResponse(
|
|
access_token="test-token",
|
|
id_token="test-id-token",
|
|
expires_in=3600,
|
|
token_type="Bearer",
|
|
user_info=user_info,
|
|
issued_at="2025-11-16T10:00:00Z",
|
|
expires_at="2025-11-16T11:00:00Z"
|
|
)
|
|
|
|
with patch('app.routers.auth.external_auth_service.authenticate_user') as mock_auth:
|
|
mock_auth.return_value = (True, auth_response, None)
|
|
|
|
login_response = client.post('/api/v2/auth/login', json={
|
|
'username': 'integration@example.com',
|
|
'password': 'password123'
|
|
})
|
|
|
|
assert login_response.status_code == 200
|
|
token = login_response.json()['access_token']
|
|
headers = {'Authorization': f'Bearer {token}'}
|
|
|
|
# Step 2: Create task
|
|
create_response = client.post(
|
|
'/api/v2/tasks/',
|
|
headers=headers,
|
|
json={
|
|
'filename': 'integration_test.pdf',
|
|
'file_type': 'application/pdf'
|
|
}
|
|
)
|
|
|
|
assert create_response.status_code == 201
|
|
task_data = create_response.json()
|
|
task_id = task_data['task_id']
|
|
|
|
# Step 3: Get task
|
|
get_response = client.get(
|
|
f'/api/v2/tasks/{task_id}',
|
|
headers=headers
|
|
)
|
|
|
|
assert get_response.status_code == 200
|
|
assert get_response.json()['task_id'] == task_id
|
|
|
|
# Step 4: List tasks
|
|
list_response = client.get(
|
|
'/api/v2/tasks/',
|
|
headers=headers
|
|
)
|
|
|
|
assert list_response.status_code == 200
|
|
assert len(list_response.json()['tasks']) > 0
|
|
|
|
# Step 5: Get stats
|
|
stats_response = client.get(
|
|
'/api/v2/tasks/stats',
|
|
headers=headers
|
|
)
|
|
|
|
assert stats_response.status_code == 200
|
|
stats = stats_response.json()
|
|
assert stats['total'] > 0
|
|
assert stats['pending'] > 0
|
|
|
|
# Step 6: Delete task
|
|
delete_response = client.delete(
|
|
f'/api/v2/tasks/{task_id}',
|
|
headers=headers
|
|
)
|
|
|
|
# DELETE returns 204 No Content (standard for successful deletion)
|
|
assert delete_response.status_code == 204
|
|
|
|
# Step 7: Verify deletion
|
|
get_after_delete = client.get(
|
|
f'/api/v2/tasks/{task_id}',
|
|
headers=headers
|
|
)
|
|
|
|
assert get_after_delete.status_code == 404
|
|
|
|
def test_admin_workflow(self, client, db):
|
|
"""Test admin workflow: login as admin -> access admin endpoints"""
|
|
|
|
# Login as admin
|
|
from app.services.external_auth_service import AuthResponse, UserInfo
|
|
|
|
user_info = UserInfo(
|
|
id="admin-id-123",
|
|
name="Admin User",
|
|
email="ymirliu@panjit.com.tw"
|
|
)
|
|
auth_response = AuthResponse(
|
|
access_token="admin-token",
|
|
id_token="admin-id-token",
|
|
expires_in=3600,
|
|
token_type="Bearer",
|
|
user_info=user_info,
|
|
issued_at="2025-11-16T10:00:00Z",
|
|
expires_at="2025-11-16T11:00:00Z"
|
|
)
|
|
|
|
with patch('app.routers.auth.external_auth_service.authenticate_user') as mock_auth:
|
|
mock_auth.return_value = (True, auth_response, None)
|
|
|
|
login_response = client.post('/api/v2/auth/login', json={
|
|
'username': 'ymirliu@panjit.com.tw',
|
|
'password': 'adminpass'
|
|
})
|
|
|
|
assert login_response.status_code == 200
|
|
token = login_response.json()['access_token']
|
|
headers = {'Authorization': f'Bearer {token}'}
|
|
|
|
# Access admin endpoints
|
|
stats_response = client.get('/api/v2/admin/stats', headers=headers)
|
|
assert stats_response.status_code == 200
|
|
|
|
users_response = client.get('/api/v2/admin/users', headers=headers)
|
|
assert users_response.status_code == 200
|
|
|
|
logs_response = client.get('/api/v2/admin/audit-logs', headers=headers)
|
|
assert logs_response.status_code == 200
|
|
|
|
def test_task_lifecycle(self, client, auth_token, test_task, db):
|
|
"""Test complete task lifecycle: pending -> processing -> completed"""
|
|
|
|
headers = {'Authorization': f'Bearer {auth_token}'}
|
|
|
|
# Check initial status
|
|
response = client.get(f'/api/v2/tasks/{test_task.task_id}', headers=headers)
|
|
assert response.json()['status'] == 'pending'
|
|
|
|
# Start task
|
|
start_response = client.post(
|
|
f'/api/v2/tasks/{test_task.task_id}/start',
|
|
headers=headers
|
|
)
|
|
assert start_response.status_code == 200
|
|
assert start_response.json()['status'] == 'processing'
|
|
|
|
# Update task to completed
|
|
update_response = client.patch(
|
|
f'/api/v2/tasks/{test_task.task_id}',
|
|
headers=headers,
|
|
json={
|
|
'status': 'completed',
|
|
'processing_time_ms': 1500
|
|
}
|
|
)
|
|
assert update_response.status_code == 200
|
|
assert update_response.json()['status'] == 'completed'
|
|
|
|
# Verify final state
|
|
final_response = client.get(f'/api/v2/tasks/{test_task.task_id}', headers=headers)
|
|
final_data = final_response.json()
|
|
assert final_data['status'] == 'completed'
|
|
assert final_data['processing_time_ms'] == 1500
|