Files
OCR/openspec/changes/migrate-to-external-api-authentication/test_external_auth.py
egg 28e419f5fa proposal: migrate to external API authentication
Create OpenSpec proposal for migrating from local database authentication
to external API authentication using Microsoft Azure AD.

Changes proposed:
- Replace local username/password auth with external API
- Integrate with https://pj-auth-api.vercel.app/api/auth/login
- Use Azure AD tokens instead of local JWT
- Display user 'name' from API response in UI
- Maintain backward compatibility with feature flag

Benefits:
- Single Sign-On (SSO) capability
- Leverage enterprise identity management
- Reduce local user management overhead
- Consistent authentication across applications

Database changes:
- Add external_user_id for Azure AD user mapping
- Add display_name for UI display
- Keep existing schema for rollback capability

Implementation includes:
- Detailed migration plan with phased rollout
- Comprehensive task list for implementation
- Test script for API validation
- Risk assessment and mitigation strategies

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-14 15:14:48 +08:00

226 lines
7.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Proof of Concept: External API Authentication Test
Tests the external authentication API at https://pj-auth-api.vercel.app
"""
import asyncio
import json
from datetime import datetime
from typing import Dict, Any, Optional
import httpx
from pydantic import BaseModel, Field
class UserInfo(BaseModel):
"""User information from external API"""
id: str
name: str
email: str
job_title: Optional[str] = Field(None, alias="jobTitle")
office_location: Optional[str] = Field(None, alias="officeLocation")
business_phones: list[str] = Field(default_factory=list, alias="businessPhones")
class AuthSuccessData(BaseModel):
"""Successful authentication response data"""
access_token: str
id_token: str
expires_in: int
token_type: str
user_info: UserInfo = Field(alias="userInfo")
issued_at: str = Field(alias="issuedAt")
expires_at: str = Field(alias="expiresAt")
class AuthSuccessResponse(BaseModel):
"""Successful authentication response"""
success: bool
message: str
data: AuthSuccessData
timestamp: str
class AuthErrorResponse(BaseModel):
"""Failed authentication response"""
success: bool
error: str
code: str
timestamp: str
class ExternalAuthClient:
"""Client for external authentication API"""
def __init__(self, base_url: str = "https://pj-auth-api.vercel.app", timeout: int = 30):
self.base_url = base_url
self.timeout = timeout
self.endpoint = "/api/auth/login"
async def authenticate(self, username: str, password: str) -> Dict[str, Any]:
"""
Authenticate user with external API
Args:
username: User email/username
password: User password
Returns:
Authentication result dictionary
"""
url = f"{self.base_url}{self.endpoint}"
print(f" Endpoint: POST {url}")
print(f" Username: {username}")
print(f" Timestamp: {datetime.now().isoformat()}")
print()
async with httpx.AsyncClient() as client:
try:
# Make authentication request
start_time = datetime.now()
response = await client.post(
url,
json={"username": username, "password": password},
timeout=self.timeout
)
elapsed = (datetime.now() - start_time).total_seconds()
# Print response details
print("Response Details:")
print(f" Status Code: {response.status_code}")
print(f" Response Time: {elapsed:.3f}s")
print(f" Content-Type: {response.headers.get('content-type', 'N/A')}")
print()
# Parse response
response_data = response.json()
print("Response Body:")
print(json.dumps(response_data, indent=2, ensure_ascii=False))
print()
# Handle success/failure
if response.status_code == 200:
auth_response = AuthSuccessResponse(**response_data)
return {
"success": True,
"status_code": response.status_code,
"data": auth_response.dict(),
"user_display_name": auth_response.data.user_info.name,
"user_email": auth_response.data.user_info.email,
"token": auth_response.data.access_token,
"expires_in": auth_response.data.expires_in,
"expires_at": auth_response.data.expires_at
}
elif response.status_code == 401:
error_response = AuthErrorResponse(**response_data)
return {
"success": False,
"status_code": response.status_code,
"error": error_response.error,
"code": error_response.code
}
else:
return {
"success": False,
"status_code": response.status_code,
"error": f"Unexpected status code: {response.status_code}",
"response": response_data
}
except httpx.TimeoutException:
print(f"❌ Request timeout after {self.timeout} seconds")
return {
"success": False,
"error": "Request timeout",
"code": "TIMEOUT"
}
except httpx.RequestError as e:
print(f"❌ Request error: {e}")
return {
"success": False,
"error": str(e),
"code": "REQUEST_ERROR"
}
except Exception as e:
print(f"❌ Unexpected error: {e}")
return {
"success": False,
"error": str(e),
"code": "UNKNOWN_ERROR"
}
async def test_authentication():
"""Test authentication with different scenarios"""
client = ExternalAuthClient()
# Test scenarios
test_cases = [
{
"name": "Valid Credentials (Example)",
"username": "ymirliu@panjit.com.tw",
"password": "correct_password", # Replace with actual password for testing
"expected": "success"
},
{
"name": "Invalid Credentials",
"username": "test@example.com",
"password": "wrong_password",
"expected": "failure"
}
]
for i, test_case in enumerate(test_cases, 1):
print(f"{'='*60}")
print(f"Test Case {i}: {test_case['name']}")
print(f"{'='*60}")
result = await client.authenticate(
username=test_case["username"],
password=test_case["password"]
)
# Analyze result
print("\nAnalysis:")
if result["success"]:
print("✅ Authentication successful")
print(f" User: {result.get('user_display_name', 'N/A')}")
print(f" Email: {result.get('user_email', 'N/A')}")
print(f" Token expires in: {result.get('expires_in', 0)} seconds")
print(f" Expires at: {result.get('expires_at', 'N/A')}")
else:
print("❌ Authentication failed")
print(f" Error: {result.get('error', 'Unknown error')}")
print(f" Code: {result.get('code', 'N/A')}")
print("\n")
async def test_token_validation():
"""Test token validation and refresh logic"""
# This would be implemented when we have a valid token
print("Token validation test - To be implemented with actual tokens")
pass
def main():
"""Main entry point"""
print("External Authentication API Test")
print("================================\n")
# Run tests
asyncio.run(test_authentication())
print("\nTest completed!")
print("\nNotes for implementation:")
print("1. Use httpx for async HTTP requests (already in requirements)")
print("2. Store tokens securely (consider encryption)")
print("3. Implement automatic token refresh before expiration")
print("4. Handle network failures with retry logic")
print("5. Map external user ID to local user records")
print("6. Display user 'name' field in UI instead of username")
if __name__ == "__main__":
main()