## Critical Issues (CRIT-001~003) - All Fixed
- JWT secret key validation with pydantic field_validator
- Login audit logging for success/failure attempts
- Frontend API path prefix removal
## High Priority Issues (HIGH-001~008) - All Fixed
- Project soft delete using is_active flag
- Redis session token bytes handling
- Rate limiting with slowapi (5 req/min for login)
- Attachment API permission checks
- Kanban view with drag-and-drop
- Workload heatmap UI (WorkloadPage, WorkloadHeatmap)
- TaskDetailModal integrating Comments/Attachments
- UserSelect component for task assignment
## Medium Priority Issues (MED-001~012) - All Fixed
- MED-001~005: DB commits, N+1 queries, datetime, error format, blocker flag
- MED-006: Project health dashboard (HealthService, ProjectHealthPage)
- MED-007: Capacity update API (PUT /api/users/{id}/capacity)
- MED-008: Schedule triggers (cron parsing, deadline reminders)
- MED-009: Watermark feature (image/PDF watermarking)
- MED-010~012: useEffect deps, DOM operations, PDF export
## New Files
- backend/app/api/health/ - Project health API
- backend/app/services/health_service.py
- backend/app/services/trigger_scheduler.py
- backend/app/services/watermark_service.py
- backend/app/core/rate_limiter.py
- frontend/src/pages/ProjectHealthPage.tsx
- frontend/src/components/ProjectHealthCard.tsx
- frontend/src/components/KanbanBoard.tsx
- frontend/src/components/WorkloadHeatmap.tsx
## Tests
- 113 new tests passing (health: 32, users: 14, triggers: 35, watermark: 32)
## OpenSpec Archives
- add-project-health-dashboard
- add-capacity-update-api
- add-schedule-triggers
- add-watermark-feature
- add-rate-limiting
- enhance-frontend-ux
- add-resource-management-ui
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
417 lines
14 KiB
Python
417 lines
14 KiB
Python
import pytest
|
|
from app.models.user import User
|
|
from app.models.department import Department
|
|
|
|
|
|
class TestUserEndpoints:
|
|
"""Test user management API endpoints."""
|
|
|
|
def test_list_users_as_admin(self, client, admin_token):
|
|
"""Test listing users as admin."""
|
|
response = client.get(
|
|
"/api/users",
|
|
headers={"Authorization": f"Bearer {admin_token}"},
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert isinstance(data, list)
|
|
assert len(data) >= 1 # At least admin user exists
|
|
|
|
def test_get_user_by_id(self, client, admin_token):
|
|
"""Test getting a specific user."""
|
|
response = client.get(
|
|
"/api/users/00000000-0000-0000-0000-000000000001",
|
|
headers={"Authorization": f"Bearer {admin_token}"},
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["email"] == "ymirliu@panjit.com.tw"
|
|
|
|
def test_get_nonexistent_user(self, client, admin_token):
|
|
"""Test getting a user that doesn't exist."""
|
|
response = client.get(
|
|
"/api/users/nonexistent-id",
|
|
headers={"Authorization": f"Bearer {admin_token}"},
|
|
)
|
|
assert response.status_code == 404
|
|
|
|
def test_update_user(self, client, admin_token, db):
|
|
"""Test updating a user."""
|
|
# Create a test user
|
|
test_user = User(
|
|
id="test-user-001",
|
|
email="test@example.com",
|
|
name="Test User",
|
|
is_active=True,
|
|
)
|
|
db.add(test_user)
|
|
db.commit()
|
|
|
|
response = client.patch(
|
|
"/api/users/test-user-001",
|
|
headers={"Authorization": f"Bearer {admin_token}"},
|
|
json={"name": "Updated Name"},
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["name"] == "Updated Name"
|
|
|
|
def test_cannot_modify_system_admin_as_non_admin(self, client, db, mock_redis):
|
|
"""Test that non-admin cannot modify system admin."""
|
|
from app.core.security import create_access_token, create_token_payload
|
|
|
|
# Create a non-admin user
|
|
non_admin = User(
|
|
id="non-admin-001",
|
|
email="nonadmin@example.com",
|
|
name="Non Admin",
|
|
role_id="00000000-0000-0000-0000-000000000003", # engineer role
|
|
is_active=True,
|
|
is_system_admin=False,
|
|
)
|
|
db.add(non_admin)
|
|
db.commit()
|
|
|
|
# Create token for non-admin
|
|
token_data = create_token_payload(
|
|
user_id="non-admin-001",
|
|
email="nonadmin@example.com",
|
|
role="engineer",
|
|
department_id=None,
|
|
is_system_admin=False,
|
|
)
|
|
token = create_access_token(token_data)
|
|
mock_redis.setex("session:non-admin-001", 900, token)
|
|
|
|
# Try to modify system admin - should fail with 403
|
|
response = client.patch(
|
|
"/api/users/00000000-0000-0000-0000-000000000001",
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
json={"name": "Hacked Name"},
|
|
)
|
|
# Engineer role doesn't have users.write permission
|
|
assert response.status_code == 403
|
|
|
|
|
|
class TestCapacityUpdate:
|
|
"""Test user capacity update API endpoint."""
|
|
|
|
def test_update_own_capacity(self, client, db, mock_redis):
|
|
"""Test that a user can update their own capacity."""
|
|
from app.core.security import create_access_token, create_token_payload
|
|
|
|
# Create a test user
|
|
test_user = User(
|
|
id="capacity-user-001",
|
|
email="capacityuser@example.com",
|
|
name="Capacity User",
|
|
is_active=True,
|
|
capacity=40.00,
|
|
)
|
|
db.add(test_user)
|
|
db.commit()
|
|
|
|
# Create token for the user
|
|
token_data = create_token_payload(
|
|
user_id="capacity-user-001",
|
|
email="capacityuser@example.com",
|
|
role="engineer",
|
|
department_id=None,
|
|
is_system_admin=False,
|
|
)
|
|
token = create_access_token(token_data)
|
|
mock_redis.setex("session:capacity-user-001", 900, token)
|
|
|
|
# Update own capacity
|
|
response = client.put(
|
|
"/api/users/capacity-user-001/capacity",
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
json={"capacity_hours": 35.5},
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert float(data["capacity"]) == 35.5
|
|
|
|
def test_admin_can_update_other_user_capacity(self, client, admin_token, db):
|
|
"""Test that admin can update another user's capacity."""
|
|
# Create a test user
|
|
test_user = User(
|
|
id="capacity-user-002",
|
|
email="capacityuser2@example.com",
|
|
name="Capacity User 2",
|
|
is_active=True,
|
|
capacity=40.00,
|
|
)
|
|
db.add(test_user)
|
|
db.commit()
|
|
|
|
# Admin updates another user's capacity
|
|
response = client.put(
|
|
"/api/users/capacity-user-002/capacity",
|
|
headers={"Authorization": f"Bearer {admin_token}"},
|
|
json={"capacity_hours": 20.0},
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert float(data["capacity"]) == 20.0
|
|
|
|
def test_non_admin_cannot_update_other_user_capacity(self, client, db, mock_redis):
|
|
"""Test that a non-admin user cannot update another user's capacity."""
|
|
from app.core.security import create_access_token, create_token_payload
|
|
|
|
# Create two test users
|
|
user1 = User(
|
|
id="capacity-user-003",
|
|
email="capacityuser3@example.com",
|
|
name="Capacity User 3",
|
|
is_active=True,
|
|
capacity=40.00,
|
|
)
|
|
user2 = User(
|
|
id="capacity-user-004",
|
|
email="capacityuser4@example.com",
|
|
name="Capacity User 4",
|
|
is_active=True,
|
|
capacity=40.00,
|
|
)
|
|
db.add_all([user1, user2])
|
|
db.commit()
|
|
|
|
# Create token for user1
|
|
token_data = create_token_payload(
|
|
user_id="capacity-user-003",
|
|
email="capacityuser3@example.com",
|
|
role="engineer",
|
|
department_id=None,
|
|
is_system_admin=False,
|
|
)
|
|
token = create_access_token(token_data)
|
|
mock_redis.setex("session:capacity-user-003", 900, token)
|
|
|
|
# User1 tries to update user2's capacity - should fail
|
|
response = client.put(
|
|
"/api/users/capacity-user-004/capacity",
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
json={"capacity_hours": 30.0},
|
|
)
|
|
assert response.status_code == 403
|
|
assert "Only admin, manager, or the user themselves" in response.json()["detail"]
|
|
|
|
def test_update_capacity_invalid_value_negative(self, client, admin_token, db):
|
|
"""Test that negative capacity hours are rejected."""
|
|
# Create a test user
|
|
test_user = User(
|
|
id="capacity-user-005",
|
|
email="capacityuser5@example.com",
|
|
name="Capacity User 5",
|
|
is_active=True,
|
|
capacity=40.00,
|
|
)
|
|
db.add(test_user)
|
|
db.commit()
|
|
|
|
response = client.put(
|
|
"/api/users/capacity-user-005/capacity",
|
|
headers={"Authorization": f"Bearer {admin_token}"},
|
|
json={"capacity_hours": -5.0},
|
|
)
|
|
# Pydantic validation returns 422 Unprocessable Entity
|
|
assert response.status_code == 422
|
|
error_detail = response.json()["detail"]
|
|
# Check validation error message in Pydantic format
|
|
assert any("non-negative" in str(err).lower() for err in error_detail)
|
|
|
|
def test_update_capacity_invalid_value_too_high(self, client, admin_token, db):
|
|
"""Test that capacity hours exceeding 168 are rejected."""
|
|
# Create a test user
|
|
test_user = User(
|
|
id="capacity-user-006",
|
|
email="capacityuser6@example.com",
|
|
name="Capacity User 6",
|
|
is_active=True,
|
|
capacity=40.00,
|
|
)
|
|
db.add(test_user)
|
|
db.commit()
|
|
|
|
response = client.put(
|
|
"/api/users/capacity-user-006/capacity",
|
|
headers={"Authorization": f"Bearer {admin_token}"},
|
|
json={"capacity_hours": 200.0},
|
|
)
|
|
# Pydantic validation returns 422 Unprocessable Entity
|
|
assert response.status_code == 422
|
|
error_detail = response.json()["detail"]
|
|
# Check validation error message in Pydantic format
|
|
assert any("168" in str(err) for err in error_detail)
|
|
|
|
def test_update_capacity_nonexistent_user(self, client, admin_token):
|
|
"""Test updating capacity for a nonexistent user."""
|
|
response = client.put(
|
|
"/api/users/nonexistent-user-id/capacity",
|
|
headers={"Authorization": f"Bearer {admin_token}"},
|
|
json={"capacity_hours": 40.0},
|
|
)
|
|
assert response.status_code == 404
|
|
assert "User not found" in response.json()["detail"]
|
|
|
|
def test_manager_can_update_other_user_capacity(self, client, db, mock_redis):
|
|
"""Test that manager can update another user's capacity."""
|
|
from app.core.security import create_access_token, create_token_payload
|
|
from app.models.role import Role
|
|
|
|
# Create manager role if not exists
|
|
manager_role = db.query(Role).filter(Role.name == "manager").first()
|
|
if not manager_role:
|
|
manager_role = Role(
|
|
id="manager-role-cap",
|
|
name="manager",
|
|
permissions={"users.read": True, "users.write": True},
|
|
)
|
|
db.add(manager_role)
|
|
db.commit()
|
|
|
|
# Create a manager user
|
|
manager_user = User(
|
|
id="manager-cap-001",
|
|
email="managercap@example.com",
|
|
name="Manager Cap",
|
|
role_id=manager_role.id,
|
|
is_active=True,
|
|
is_system_admin=False,
|
|
)
|
|
# Create a regular user
|
|
regular_user = User(
|
|
id="regular-cap-001",
|
|
email="regularcap@example.com",
|
|
name="Regular Cap",
|
|
is_active=True,
|
|
capacity=40.00,
|
|
)
|
|
db.add_all([manager_user, regular_user])
|
|
db.commit()
|
|
|
|
# Create token for manager
|
|
token_data = create_token_payload(
|
|
user_id="manager-cap-001",
|
|
email="managercap@example.com",
|
|
role="manager",
|
|
department_id=None,
|
|
is_system_admin=False,
|
|
)
|
|
token = create_access_token(token_data)
|
|
mock_redis.setex("session:manager-cap-001", 900, token)
|
|
|
|
# Manager updates regular user's capacity
|
|
response = client.put(
|
|
"/api/users/regular-cap-001/capacity",
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
json={"capacity_hours": 30.0},
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert float(data["capacity"]) == 30.0
|
|
|
|
def test_capacity_change_creates_audit_log(self, client, admin_token, db):
|
|
"""Test that capacity changes are recorded in audit trail."""
|
|
from app.models import AuditLog
|
|
|
|
# Create a test user
|
|
test_user = User(
|
|
id="capacity-audit-001",
|
|
email="capacityaudit@example.com",
|
|
name="Capacity Audit User",
|
|
is_active=True,
|
|
capacity=40.00,
|
|
)
|
|
db.add(test_user)
|
|
db.commit()
|
|
|
|
# Update capacity
|
|
response = client.put(
|
|
"/api/users/capacity-audit-001/capacity",
|
|
headers={"Authorization": f"Bearer {admin_token}"},
|
|
json={"capacity_hours": 35.0},
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
# Check audit log was created
|
|
audit_log = db.query(AuditLog).filter(
|
|
AuditLog.resource_id == "capacity-audit-001",
|
|
AuditLog.event_type == "user.capacity_change"
|
|
).first()
|
|
|
|
assert audit_log is not None
|
|
assert audit_log.resource_type == "user"
|
|
assert audit_log.action == "update"
|
|
assert len(audit_log.changes) == 1
|
|
assert audit_log.changes[0]["field"] == "capacity"
|
|
assert audit_log.changes[0]["old_value"] == 40.0
|
|
assert audit_log.changes[0]["new_value"] == 35.0
|
|
|
|
|
|
class TestDepartmentIsolation:
|
|
"""Test department-based access control."""
|
|
|
|
def test_department_isolation(self, client, db, mock_redis):
|
|
"""Test that users can only see users in their department."""
|
|
from app.core.security import create_access_token, create_token_payload
|
|
|
|
# Create departments
|
|
dept_a = Department(id="dept-a", name="Department A")
|
|
dept_b = Department(id="dept-b", name="Department B")
|
|
db.add_all([dept_a, dept_b])
|
|
|
|
# Create manager role
|
|
from app.models.role import Role
|
|
manager_role = Role(
|
|
id="manager-role",
|
|
name="manager",
|
|
permissions={"users.read": True, "users.write": True},
|
|
)
|
|
db.add(manager_role)
|
|
|
|
# Create users in different departments
|
|
user_a = User(
|
|
id="user-a",
|
|
email="usera@example.com",
|
|
name="User A",
|
|
department_id="dept-a",
|
|
role_id="manager-role",
|
|
is_active=True,
|
|
)
|
|
user_b = User(
|
|
id="user-b",
|
|
email="userb@example.com",
|
|
name="User B",
|
|
department_id="dept-b",
|
|
role_id="manager-role",
|
|
is_active=True,
|
|
)
|
|
db.add_all([user_a, user_b])
|
|
db.commit()
|
|
|
|
# Create token for user A
|
|
token_data = create_token_payload(
|
|
user_id="user-a",
|
|
email="usera@example.com",
|
|
role="manager",
|
|
department_id="dept-a",
|
|
is_system_admin=False,
|
|
)
|
|
token = create_access_token(token_data)
|
|
mock_redis.setex("session:user-a", 900, token)
|
|
|
|
# User A should only see users in dept-a
|
|
response = client.get(
|
|
"/api/users",
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
# Should only contain user A (filtered by department)
|
|
emails = [u["email"] for u in data]
|
|
assert "usera@example.com" in emails
|
|
assert "userb@example.com" not in emails
|