feat: implement user authentication module
- Backend (FastAPI): - External API authentication (pj-auth-api.vercel.app) - JWT token validation with Redis session storage - RBAC with department isolation - User, Role, Department models with pjctrl_ prefix - Alembic migrations with project-specific version table - Complete test coverage (13 tests) - Frontend (React + Vite): - AuthContext for state management - Login page with error handling - Protected route component - Dashboard with user info display - OpenSpec: - 7 capability specs defined - add-user-auth change archived 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
0
backend/tests/__init__.py
Normal file
0
backend/tests/__init__.py
Normal file
128
backend/tests/conftest.py
Normal file
128
backend/tests/conftest.py
Normal file
@@ -0,0 +1,128 @@
|
||||
import pytest
|
||||
from fastapi.testclient import TestClient
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
from sqlalchemy.pool import StaticPool
|
||||
|
||||
from app.main import app
|
||||
from app.core.database import Base, get_db
|
||||
from app.core.redis import get_redis
|
||||
from app.models import User, Role, Department
|
||||
|
||||
# Use in-memory SQLite for testing
|
||||
SQLALCHEMY_DATABASE_URL = "sqlite:///:memory:"
|
||||
|
||||
engine = create_engine(
|
||||
SQLALCHEMY_DATABASE_URL,
|
||||
connect_args={"check_same_thread": False},
|
||||
poolclass=StaticPool,
|
||||
)
|
||||
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
||||
|
||||
|
||||
class MockRedis:
|
||||
"""Mock Redis client for testing."""
|
||||
|
||||
def __init__(self):
|
||||
self.store = {}
|
||||
|
||||
def get(self, key):
|
||||
return self.store.get(key)
|
||||
|
||||
def setex(self, key, seconds, value):
|
||||
self.store[key] = value
|
||||
|
||||
def delete(self, key):
|
||||
if key in self.store:
|
||||
del self.store[key]
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def db():
|
||||
"""Create a fresh database for each test."""
|
||||
Base.metadata.create_all(bind=engine)
|
||||
db = TestingSessionLocal()
|
||||
|
||||
# Create default role
|
||||
admin_role = Role(
|
||||
id="00000000-0000-0000-0000-000000000001",
|
||||
name="super_admin",
|
||||
permissions={"all": True},
|
||||
is_system_role=True,
|
||||
)
|
||||
db.add(admin_role)
|
||||
|
||||
engineer_role = Role(
|
||||
id="00000000-0000-0000-0000-000000000003",
|
||||
name="engineer",
|
||||
permissions={"projects.read": True, "tasks.read": True, "tasks.write": True},
|
||||
is_system_role=False,
|
||||
)
|
||||
db.add(engineer_role)
|
||||
|
||||
# Create system admin user
|
||||
admin_user = User(
|
||||
id="00000000-0000-0000-0000-000000000001",
|
||||
email="ymirliu@panjit.com.tw",
|
||||
name="System Administrator",
|
||||
role_id="00000000-0000-0000-0000-000000000001",
|
||||
is_active=True,
|
||||
is_system_admin=True,
|
||||
)
|
||||
db.add(admin_user)
|
||||
|
||||
db.commit()
|
||||
|
||||
try:
|
||||
yield db
|
||||
finally:
|
||||
db.close()
|
||||
Base.metadata.drop_all(bind=engine)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def mock_redis():
|
||||
"""Create mock Redis for testing."""
|
||||
return MockRedis()
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def client(db, mock_redis):
|
||||
"""Create test client with overridden dependencies."""
|
||||
|
||||
def override_get_db():
|
||||
try:
|
||||
yield db
|
||||
finally:
|
||||
pass
|
||||
|
||||
def override_get_redis():
|
||||
return mock_redis
|
||||
|
||||
app.dependency_overrides[get_db] = override_get_db
|
||||
app.dependency_overrides[get_redis] = override_get_redis
|
||||
|
||||
with TestClient(app) as test_client:
|
||||
yield test_client
|
||||
|
||||
app.dependency_overrides.clear()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def admin_token(client, mock_redis):
|
||||
"""Get an admin token for testing."""
|
||||
from app.core.security import create_access_token, create_token_payload
|
||||
|
||||
token_data = create_token_payload(
|
||||
user_id="00000000-0000-0000-0000-000000000001",
|
||||
email="ymirliu@panjit.com.tw",
|
||||
role="super_admin",
|
||||
department_id=None,
|
||||
is_system_admin=True,
|
||||
)
|
||||
token = create_access_token(token_data)
|
||||
|
||||
# Store in mock Redis
|
||||
mock_redis.setex("session:00000000-0000-0000-0000-000000000001", 900, token)
|
||||
|
||||
return token
|
||||
84
backend/tests/test_auth.py
Normal file
84
backend/tests/test_auth.py
Normal file
@@ -0,0 +1,84 @@
|
||||
import pytest
|
||||
from app.core.security import create_access_token, decode_access_token, create_token_payload
|
||||
|
||||
|
||||
class TestJWT:
|
||||
"""Test JWT token creation and validation."""
|
||||
|
||||
def test_create_access_token(self):
|
||||
"""Test creating an access token."""
|
||||
data = {"sub": "user123", "email": "test@example.com"}
|
||||
token = create_access_token(data)
|
||||
|
||||
assert token is not None
|
||||
assert isinstance(token, str)
|
||||
|
||||
def test_decode_valid_token(self):
|
||||
"""Test decoding a valid token."""
|
||||
data = create_token_payload(
|
||||
user_id="user123",
|
||||
email="test@example.com",
|
||||
role="engineer",
|
||||
department_id="dept123",
|
||||
is_system_admin=False,
|
||||
)
|
||||
token = create_access_token(data)
|
||||
payload = decode_access_token(token)
|
||||
|
||||
assert payload is not None
|
||||
assert payload["sub"] == "user123"
|
||||
assert payload["email"] == "test@example.com"
|
||||
assert payload["role"] == "engineer"
|
||||
assert payload["is_system_admin"] is False
|
||||
|
||||
def test_decode_invalid_token(self):
|
||||
"""Test decoding an invalid token."""
|
||||
payload = decode_access_token("invalid.token.here")
|
||||
assert payload is None
|
||||
|
||||
def test_token_payload_structure(self):
|
||||
"""Test token payload has correct structure."""
|
||||
payload = create_token_payload(
|
||||
user_id="user123",
|
||||
email="test@example.com",
|
||||
role="engineer",
|
||||
department_id="dept123",
|
||||
is_system_admin=False,
|
||||
)
|
||||
|
||||
assert "sub" in payload
|
||||
assert "email" in payload
|
||||
assert "role" in payload
|
||||
assert "department_id" in payload
|
||||
assert "is_system_admin" in payload
|
||||
|
||||
|
||||
class TestAuthEndpoints:
|
||||
"""Test authentication API endpoints."""
|
||||
|
||||
def test_get_me_without_auth(self, client):
|
||||
"""Test accessing /me without authentication."""
|
||||
response = client.get("/api/auth/me")
|
||||
assert response.status_code == 403
|
||||
|
||||
def test_get_me_with_auth(self, client, admin_token):
|
||||
"""Test accessing /me with valid authentication."""
|
||||
response = client.get(
|
||||
"/api/auth/me",
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["email"] == "ymirliu@panjit.com.tw"
|
||||
assert data["is_system_admin"] is True
|
||||
|
||||
def test_logout(self, client, admin_token, mock_redis):
|
||||
"""Test logout endpoint."""
|
||||
response = client.post(
|
||||
"/api/auth/logout",
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
|
||||
# Verify session is removed
|
||||
assert mock_redis.get("session:00000000-0000-0000-0000-000000000001") is None
|
||||
159
backend/tests/test_users.py
Normal file
159
backend/tests/test_users.py
Normal file
@@ -0,0 +1,159 @@
|
||||
import pytest
|
||||
from app.models.user import User
|
||||
from app.models.department import Department
|
||||
|
||||
|
||||
class TestUserEndpoints:
|
||||
"""Test user management API endpoints."""
|
||||
|
||||
def test_list_users_as_admin(self, client, admin_token):
|
||||
"""Test listing users as admin."""
|
||||
response = client.get(
|
||||
"/api/users",
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert isinstance(data, list)
|
||||
assert len(data) >= 1 # At least admin user exists
|
||||
|
||||
def test_get_user_by_id(self, client, admin_token):
|
||||
"""Test getting a specific user."""
|
||||
response = client.get(
|
||||
"/api/users/00000000-0000-0000-0000-000000000001",
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["email"] == "ymirliu@panjit.com.tw"
|
||||
|
||||
def test_get_nonexistent_user(self, client, admin_token):
|
||||
"""Test getting a user that doesn't exist."""
|
||||
response = client.get(
|
||||
"/api/users/nonexistent-id",
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
assert response.status_code == 404
|
||||
|
||||
def test_update_user(self, client, admin_token, db):
|
||||
"""Test updating a user."""
|
||||
# Create a test user
|
||||
test_user = User(
|
||||
id="test-user-001",
|
||||
email="test@example.com",
|
||||
name="Test User",
|
||||
is_active=True,
|
||||
)
|
||||
db.add(test_user)
|
||||
db.commit()
|
||||
|
||||
response = client.patch(
|
||||
"/api/users/test-user-001",
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
json={"name": "Updated Name"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["name"] == "Updated Name"
|
||||
|
||||
def test_cannot_modify_system_admin_as_non_admin(self, client, db, mock_redis):
|
||||
"""Test that non-admin cannot modify system admin."""
|
||||
from app.core.security import create_access_token, create_token_payload
|
||||
|
||||
# Create a non-admin user
|
||||
non_admin = User(
|
||||
id="non-admin-001",
|
||||
email="nonadmin@example.com",
|
||||
name="Non Admin",
|
||||
role_id="00000000-0000-0000-0000-000000000003", # engineer role
|
||||
is_active=True,
|
||||
is_system_admin=False,
|
||||
)
|
||||
db.add(non_admin)
|
||||
db.commit()
|
||||
|
||||
# Create token for non-admin
|
||||
token_data = create_token_payload(
|
||||
user_id="non-admin-001",
|
||||
email="nonadmin@example.com",
|
||||
role="engineer",
|
||||
department_id=None,
|
||||
is_system_admin=False,
|
||||
)
|
||||
token = create_access_token(token_data)
|
||||
mock_redis.setex("session:non-admin-001", 900, token)
|
||||
|
||||
# Try to modify system admin - should fail with 403
|
||||
response = client.patch(
|
||||
"/api/users/00000000-0000-0000-0000-000000000001",
|
||||
headers={"Authorization": f"Bearer {token}"},
|
||||
json={"name": "Hacked Name"},
|
||||
)
|
||||
# Engineer role doesn't have users.write permission
|
||||
assert response.status_code == 403
|
||||
|
||||
|
||||
class TestDepartmentIsolation:
|
||||
"""Test department-based access control."""
|
||||
|
||||
def test_department_isolation(self, client, db, mock_redis):
|
||||
"""Test that users can only see users in their department."""
|
||||
from app.core.security import create_access_token, create_token_payload
|
||||
|
||||
# Create departments
|
||||
dept_a = Department(id="dept-a", name="Department A")
|
||||
dept_b = Department(id="dept-b", name="Department B")
|
||||
db.add_all([dept_a, dept_b])
|
||||
|
||||
# Create manager role
|
||||
from app.models.role import Role
|
||||
manager_role = Role(
|
||||
id="manager-role",
|
||||
name="manager",
|
||||
permissions={"users.read": True, "users.write": True},
|
||||
)
|
||||
db.add(manager_role)
|
||||
|
||||
# Create users in different departments
|
||||
user_a = User(
|
||||
id="user-a",
|
||||
email="usera@example.com",
|
||||
name="User A",
|
||||
department_id="dept-a",
|
||||
role_id="manager-role",
|
||||
is_active=True,
|
||||
)
|
||||
user_b = User(
|
||||
id="user-b",
|
||||
email="userb@example.com",
|
||||
name="User B",
|
||||
department_id="dept-b",
|
||||
role_id="manager-role",
|
||||
is_active=True,
|
||||
)
|
||||
db.add_all([user_a, user_b])
|
||||
db.commit()
|
||||
|
||||
# Create token for user A
|
||||
token_data = create_token_payload(
|
||||
user_id="user-a",
|
||||
email="usera@example.com",
|
||||
role="manager",
|
||||
department_id="dept-a",
|
||||
is_system_admin=False,
|
||||
)
|
||||
token = create_access_token(token_data)
|
||||
mock_redis.setex("session:user-a", 900, token)
|
||||
|
||||
# User A should only see users in dept-a
|
||||
response = client.get(
|
||||
"/api/users",
|
||||
headers={"Authorization": f"Bearer {token}"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
|
||||
# Should only contain user A (filtered by department)
|
||||
emails = [u["email"] for u in data]
|
||||
assert "usera@example.com" in emails
|
||||
assert "userb@example.com" not in emails
|
||||
Reference in New Issue
Block a user