Create OpenSpec proposal for migrating from local database authentication to external API authentication using Microsoft Azure AD. Changes proposed: - Replace local username/password auth with external API - Integrate with https://pj-auth-api.vercel.app/api/auth/login - Use Azure AD tokens instead of local JWT - Display user 'name' from API response in UI - Maintain backward compatibility with feature flag Benefits: - Single Sign-On (SSO) capability - Leverage enterprise identity management - Reduce local user management overhead - Consistent authentication across applications Database changes: - Add external_user_id for Azure AD user mapping - Add display_name for UI display - Keep existing schema for rollback capability Implementation includes: - Detailed migration plan with phased rollout - Comprehensive task list for implementation - Test script for API validation - Risk assessment and mitigation strategies 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
226 lines
7.5 KiB
Python
226 lines
7.5 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Proof of Concept: External API Authentication Test
|
||
Tests the external authentication API at https://pj-auth-api.vercel.app
|
||
"""
|
||
|
||
import asyncio
|
||
import json
|
||
from datetime import datetime
|
||
from typing import Dict, Any, Optional
|
||
import httpx
|
||
from pydantic import BaseModel, Field
|
||
|
||
|
||
class UserInfo(BaseModel):
|
||
"""User information from external API"""
|
||
id: str
|
||
name: str
|
||
email: str
|
||
job_title: Optional[str] = Field(None, alias="jobTitle")
|
||
office_location: Optional[str] = Field(None, alias="officeLocation")
|
||
business_phones: list[str] = Field(default_factory=list, alias="businessPhones")
|
||
|
||
|
||
class AuthSuccessData(BaseModel):
|
||
"""Successful authentication response data"""
|
||
access_token: str
|
||
id_token: str
|
||
expires_in: int
|
||
token_type: str
|
||
user_info: UserInfo = Field(alias="userInfo")
|
||
issued_at: str = Field(alias="issuedAt")
|
||
expires_at: str = Field(alias="expiresAt")
|
||
|
||
|
||
class AuthSuccessResponse(BaseModel):
|
||
"""Successful authentication response"""
|
||
success: bool
|
||
message: str
|
||
data: AuthSuccessData
|
||
timestamp: str
|
||
|
||
|
||
class AuthErrorResponse(BaseModel):
|
||
"""Failed authentication response"""
|
||
success: bool
|
||
error: str
|
||
code: str
|
||
timestamp: str
|
||
|
||
|
||
class ExternalAuthClient:
|
||
"""Client for external authentication API"""
|
||
|
||
def __init__(self, base_url: str = "https://pj-auth-api.vercel.app", timeout: int = 30):
|
||
self.base_url = base_url
|
||
self.timeout = timeout
|
||
self.endpoint = "/api/auth/login"
|
||
|
||
async def authenticate(self, username: str, password: str) -> Dict[str, Any]:
|
||
"""
|
||
Authenticate user with external API
|
||
|
||
Args:
|
||
username: User email/username
|
||
password: User password
|
||
|
||
Returns:
|
||
Authentication result dictionary
|
||
"""
|
||
url = f"{self.base_url}{self.endpoint}"
|
||
|
||
print(f"ℹ Endpoint: POST {url}")
|
||
print(f"ℹ Username: {username}")
|
||
print(f"ℹ Timestamp: {datetime.now().isoformat()}")
|
||
print()
|
||
|
||
async with httpx.AsyncClient() as client:
|
||
try:
|
||
# Make authentication request
|
||
start_time = datetime.now()
|
||
response = await client.post(
|
||
url,
|
||
json={"username": username, "password": password},
|
||
timeout=self.timeout
|
||
)
|
||
elapsed = (datetime.now() - start_time).total_seconds()
|
||
|
||
# Print response details
|
||
print("Response Details:")
|
||
print(f" Status Code: {response.status_code}")
|
||
print(f" Response Time: {elapsed:.3f}s")
|
||
print(f" Content-Type: {response.headers.get('content-type', 'N/A')}")
|
||
print()
|
||
|
||
# Parse response
|
||
response_data = response.json()
|
||
print("Response Body:")
|
||
print(json.dumps(response_data, indent=2, ensure_ascii=False))
|
||
print()
|
||
|
||
# Handle success/failure
|
||
if response.status_code == 200:
|
||
auth_response = AuthSuccessResponse(**response_data)
|
||
return {
|
||
"success": True,
|
||
"status_code": response.status_code,
|
||
"data": auth_response.dict(),
|
||
"user_display_name": auth_response.data.user_info.name,
|
||
"user_email": auth_response.data.user_info.email,
|
||
"token": auth_response.data.access_token,
|
||
"expires_in": auth_response.data.expires_in,
|
||
"expires_at": auth_response.data.expires_at
|
||
}
|
||
elif response.status_code == 401:
|
||
error_response = AuthErrorResponse(**response_data)
|
||
return {
|
||
"success": False,
|
||
"status_code": response.status_code,
|
||
"error": error_response.error,
|
||
"code": error_response.code
|
||
}
|
||
else:
|
||
return {
|
||
"success": False,
|
||
"status_code": response.status_code,
|
||
"error": f"Unexpected status code: {response.status_code}",
|
||
"response": response_data
|
||
}
|
||
|
||
except httpx.TimeoutException:
|
||
print(f"❌ Request timeout after {self.timeout} seconds")
|
||
return {
|
||
"success": False,
|
||
"error": "Request timeout",
|
||
"code": "TIMEOUT"
|
||
}
|
||
except httpx.RequestError as e:
|
||
print(f"❌ Request error: {e}")
|
||
return {
|
||
"success": False,
|
||
"error": str(e),
|
||
"code": "REQUEST_ERROR"
|
||
}
|
||
except Exception as e:
|
||
print(f"❌ Unexpected error: {e}")
|
||
return {
|
||
"success": False,
|
||
"error": str(e),
|
||
"code": "UNKNOWN_ERROR"
|
||
}
|
||
|
||
|
||
async def test_authentication():
|
||
"""Test authentication with different scenarios"""
|
||
client = ExternalAuthClient()
|
||
|
||
# Test scenarios
|
||
test_cases = [
|
||
{
|
||
"name": "Valid Credentials (Example)",
|
||
"username": "ymirliu@panjit.com.tw",
|
||
"password": "correct_password", # Replace with actual password for testing
|
||
"expected": "success"
|
||
},
|
||
{
|
||
"name": "Invalid Credentials",
|
||
"username": "test@example.com",
|
||
"password": "wrong_password",
|
||
"expected": "failure"
|
||
}
|
||
]
|
||
|
||
for i, test_case in enumerate(test_cases, 1):
|
||
print(f"{'='*60}")
|
||
print(f"Test Case {i}: {test_case['name']}")
|
||
print(f"{'='*60}")
|
||
|
||
result = await client.authenticate(
|
||
username=test_case["username"],
|
||
password=test_case["password"]
|
||
)
|
||
|
||
# Analyze result
|
||
print("\nAnalysis:")
|
||
if result["success"]:
|
||
print("✅ Authentication successful")
|
||
print(f" User: {result.get('user_display_name', 'N/A')}")
|
||
print(f" Email: {result.get('user_email', 'N/A')}")
|
||
print(f" Token expires in: {result.get('expires_in', 0)} seconds")
|
||
print(f" Expires at: {result.get('expires_at', 'N/A')}")
|
||
else:
|
||
print("❌ Authentication failed")
|
||
print(f" Error: {result.get('error', 'Unknown error')}")
|
||
print(f" Code: {result.get('code', 'N/A')}")
|
||
|
||
print("\n")
|
||
|
||
|
||
async def test_token_validation():
|
||
"""Test token validation and refresh logic"""
|
||
# This would be implemented when we have a valid token
|
||
print("Token validation test - To be implemented with actual tokens")
|
||
pass
|
||
|
||
|
||
def main():
|
||
"""Main entry point"""
|
||
print("External Authentication API Test")
|
||
print("================================\n")
|
||
|
||
# Run tests
|
||
asyncio.run(test_authentication())
|
||
|
||
print("\nTest completed!")
|
||
print("\nNotes for implementation:")
|
||
print("1. Use httpx for async HTTP requests (already in requirements)")
|
||
print("2. Store tokens securely (consider encryption)")
|
||
print("3. Implement automatic token refresh before expiration")
|
||
print("4. Handle network failures with retry logic")
|
||
print("5. Map external user ID to local user records")
|
||
print("6. Display user 'name' field in UI instead of username")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main() |