Files
PROJECT-CONTORL/backend/tests/test_users.py
beabigegg 35c90fe76b feat: implement 5 QA-driven security and quality proposals
Implemented proposals from comprehensive QA review:

1. extend-csrf-protection
   - Add POST to CSRF protected methods in frontend
   - Global CSRF middleware for all state-changing operations
   - Update tests with CSRF token fixtures

2. tighten-cors-websocket-security
   - Replace wildcard CORS with explicit method/header lists
   - Disable query parameter auth in production (code 4002)
   - Add per-user WebSocket connection limit (max 5, code 4005)

3. shorten-jwt-expiry
   - Reduce JWT expiry from 7 days to 60 minutes
   - Add refresh token support with 7-day expiry
   - Implement token rotation on refresh
   - Frontend auto-refresh when token near expiry (<5 min)

4. fix-frontend-quality
   - Add React.lazy() code splitting for all pages
   - Fix useCallback dependency arrays (Dashboard, Comments)
   - Add localStorage data validation in AuthContext
   - Complete i18n for AttachmentUpload component

5. enhance-backend-validation
   - Add SecurityAuditMiddleware for access denied logging
   - Add ErrorSanitizerMiddleware for production error messages
   - Protect /health/detailed with admin authentication
   - Add input length validation (comment 5000, desc 10000)

All 521 backend tests passing. Frontend builds successfully.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-12 23:19:05 +08:00

422 lines
14 KiB
Python

import pytest
from app.models.user import User
from app.models.department import Department
from app.core.security import generate_csrf_token
class TestUserEndpoints:
"""Test user management API endpoints."""
def test_list_users_as_admin(self, client, admin_token):
"""Test listing users as admin."""
response = client.get(
"/api/users",
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
assert len(data) >= 1 # At least admin user exists
def test_get_user_by_id(self, client, admin_token):
"""Test getting a specific user."""
response = client.get(
"/api/users/00000000-0000-0000-0000-000000000001",
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == 200
data = response.json()
assert data["email"] == "ymirliu@panjit.com.tw"
def test_get_nonexistent_user(self, client, admin_token):
"""Test getting a user that doesn't exist."""
response = client.get(
"/api/users/nonexistent-id",
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == 404
def test_update_user(self, client, auth_headers, db):
"""Test updating a user."""
# Create a test user
test_user = User(
id="test-user-001",
email="test@example.com",
name="Test User",
is_active=True,
)
db.add(test_user)
db.commit()
response = client.patch(
"/api/users/test-user-001",
headers=auth_headers,
json={"name": "Updated Name"},
)
assert response.status_code == 200
data = response.json()
assert data["name"] == "Updated Name"
def test_cannot_modify_system_admin_as_non_admin(self, client, db, mock_redis):
"""Test that non-admin cannot modify system admin."""
from app.core.security import create_access_token, create_token_payload
# Create a non-admin user
non_admin = User(
id="non-admin-001",
email="nonadmin@example.com",
name="Non Admin",
role_id="00000000-0000-0000-0000-000000000003", # engineer role
is_active=True,
is_system_admin=False,
)
db.add(non_admin)
db.commit()
# Create token for non-admin
token_data = create_token_payload(
user_id="non-admin-001",
email="nonadmin@example.com",
role="engineer",
department_id=None,
is_system_admin=False,
)
token = create_access_token(token_data)
mock_redis.setex("session:non-admin-001", 900, token)
# Try to modify system admin - should fail with 403
csrf_token = generate_csrf_token("non-admin-001")
response = client.patch(
"/api/users/00000000-0000-0000-0000-000000000001",
headers={"Authorization": f"Bearer {token}", "X-CSRF-Token": csrf_token},
json={"name": "Hacked Name"},
)
# Engineer role doesn't have users.write permission
assert response.status_code == 403
class TestCapacityUpdate:
"""Test user capacity update API endpoint."""
def test_update_own_capacity(self, client, db, mock_redis):
"""Test that a user can update their own capacity."""
from app.core.security import create_access_token, create_token_payload
# Create a test user
test_user = User(
id="capacity-user-001",
email="capacityuser@example.com",
name="Capacity User",
is_active=True,
capacity=40.00,
)
db.add(test_user)
db.commit()
# Create token for the user
token_data = create_token_payload(
user_id="capacity-user-001",
email="capacityuser@example.com",
role="engineer",
department_id=None,
is_system_admin=False,
)
token = create_access_token(token_data)
mock_redis.setex("session:capacity-user-001", 900, token)
# Update own capacity
csrf_token = generate_csrf_token("capacity-user-001")
response = client.put(
"/api/users/capacity-user-001/capacity",
headers={"Authorization": f"Bearer {token}", "X-CSRF-Token": csrf_token},
json={"capacity_hours": 35.5},
)
assert response.status_code == 200
data = response.json()
assert float(data["capacity"]) == 35.5
def test_admin_can_update_other_user_capacity(self, client, auth_headers, db):
"""Test that admin can update another user's capacity."""
# Create a test user
test_user = User(
id="capacity-user-002",
email="capacityuser2@example.com",
name="Capacity User 2",
is_active=True,
capacity=40.00,
)
db.add(test_user)
db.commit()
# Admin updates another user's capacity
response = client.put(
"/api/users/capacity-user-002/capacity",
headers=auth_headers,
json={"capacity_hours": 20.0},
)
assert response.status_code == 200
data = response.json()
assert float(data["capacity"]) == 20.0
def test_non_admin_cannot_update_other_user_capacity(self, client, db, mock_redis):
"""Test that a non-admin user cannot update another user's capacity."""
from app.core.security import create_access_token, create_token_payload
# Create two test users
user1 = User(
id="capacity-user-003",
email="capacityuser3@example.com",
name="Capacity User 3",
is_active=True,
capacity=40.00,
)
user2 = User(
id="capacity-user-004",
email="capacityuser4@example.com",
name="Capacity User 4",
is_active=True,
capacity=40.00,
)
db.add_all([user1, user2])
db.commit()
# Create token for user1
token_data = create_token_payload(
user_id="capacity-user-003",
email="capacityuser3@example.com",
role="engineer",
department_id=None,
is_system_admin=False,
)
token = create_access_token(token_data)
mock_redis.setex("session:capacity-user-003", 900, token)
# User1 tries to update user2's capacity - should fail
csrf_token = generate_csrf_token("capacity-user-003")
response = client.put(
"/api/users/capacity-user-004/capacity",
headers={"Authorization": f"Bearer {token}", "X-CSRF-Token": csrf_token},
json={"capacity_hours": 30.0},
)
assert response.status_code == 403
assert "Only admin, manager, or the user themselves" in response.json()["detail"]
def test_update_capacity_invalid_value_negative(self, client, auth_headers, db):
"""Test that negative capacity hours are rejected."""
# Create a test user
test_user = User(
id="capacity-user-005",
email="capacityuser5@example.com",
name="Capacity User 5",
is_active=True,
capacity=40.00,
)
db.add(test_user)
db.commit()
response = client.put(
"/api/users/capacity-user-005/capacity",
headers=auth_headers,
json={"capacity_hours": -5.0},
)
# Pydantic validation returns 422 Unprocessable Entity
assert response.status_code == 422
error_detail = response.json()["detail"]
# Check validation error message in Pydantic format
assert any("non-negative" in str(err).lower() for err in error_detail)
def test_update_capacity_invalid_value_too_high(self, client, auth_headers, db):
"""Test that capacity hours exceeding 168 are rejected."""
# Create a test user
test_user = User(
id="capacity-user-006",
email="capacityuser6@example.com",
name="Capacity User 6",
is_active=True,
capacity=40.00,
)
db.add(test_user)
db.commit()
response = client.put(
"/api/users/capacity-user-006/capacity",
headers=auth_headers,
json={"capacity_hours": 200.0},
)
# Pydantic validation returns 422 Unprocessable Entity
assert response.status_code == 422
error_detail = response.json()["detail"]
# Check validation error message in Pydantic format
assert any("168" in str(err) for err in error_detail)
def test_update_capacity_nonexistent_user(self, client, auth_headers):
"""Test updating capacity for a nonexistent user."""
response = client.put(
"/api/users/nonexistent-user-id/capacity",
headers=auth_headers,
json={"capacity_hours": 40.0},
)
assert response.status_code == 404
assert "User not found" in response.json()["detail"]
def test_manager_can_update_other_user_capacity(self, client, db, mock_redis):
"""Test that manager can update another user's capacity."""
from app.core.security import create_access_token, create_token_payload
from app.models.role import Role
# Create manager role if not exists
manager_role = db.query(Role).filter(Role.name == "manager").first()
if not manager_role:
manager_role = Role(
id="manager-role-cap",
name="manager",
permissions={"users.read": True, "users.write": True},
)
db.add(manager_role)
db.commit()
# Create a manager user
manager_user = User(
id="manager-cap-001",
email="managercap@example.com",
name="Manager Cap",
role_id=manager_role.id,
is_active=True,
is_system_admin=False,
)
# Create a regular user
regular_user = User(
id="regular-cap-001",
email="regularcap@example.com",
name="Regular Cap",
is_active=True,
capacity=40.00,
)
db.add_all([manager_user, regular_user])
db.commit()
# Create token for manager
token_data = create_token_payload(
user_id="manager-cap-001",
email="managercap@example.com",
role="manager",
department_id=None,
is_system_admin=False,
)
token = create_access_token(token_data)
mock_redis.setex("session:manager-cap-001", 900, token)
# Manager updates regular user's capacity
csrf_token = generate_csrf_token("manager-cap-001")
response = client.put(
"/api/users/regular-cap-001/capacity",
headers={"Authorization": f"Bearer {token}", "X-CSRF-Token": csrf_token},
json={"capacity_hours": 30.0},
)
assert response.status_code == 200
data = response.json()
assert float(data["capacity"]) == 30.0
def test_capacity_change_creates_audit_log(self, client, auth_headers, db):
"""Test that capacity changes are recorded in audit trail."""
from app.models import AuditLog
# Create a test user
test_user = User(
id="capacity-audit-001",
email="capacityaudit@example.com",
name="Capacity Audit User",
is_active=True,
capacity=40.00,
)
db.add(test_user)
db.commit()
# Update capacity
response = client.put(
"/api/users/capacity-audit-001/capacity",
headers=auth_headers,
json={"capacity_hours": 35.0},
)
assert response.status_code == 200
# Check audit log was created
audit_log = db.query(AuditLog).filter(
AuditLog.resource_id == "capacity-audit-001",
AuditLog.event_type == "user.capacity_change"
).first()
assert audit_log is not None
assert audit_log.resource_type == "user"
assert audit_log.action == "update"
assert len(audit_log.changes) == 1
assert audit_log.changes[0]["field"] == "capacity"
assert audit_log.changes[0]["old_value"] == 40.0
assert audit_log.changes[0]["new_value"] == 35.0
class TestDepartmentIsolation:
"""Test department-based access control."""
def test_department_isolation(self, client, db, mock_redis):
"""Test that users can only see users in their department."""
from app.core.security import create_access_token, create_token_payload
# Create departments
dept_a = Department(id="dept-a", name="Department A")
dept_b = Department(id="dept-b", name="Department B")
db.add_all([dept_a, dept_b])
# Create manager role
from app.models.role import Role
manager_role = Role(
id="manager-role",
name="manager",
permissions={"users.read": True, "users.write": True},
)
db.add(manager_role)
# Create users in different departments
user_a = User(
id="user-a",
email="usera@example.com",
name="User A",
department_id="dept-a",
role_id="manager-role",
is_active=True,
)
user_b = User(
id="user-b",
email="userb@example.com",
name="User B",
department_id="dept-b",
role_id="manager-role",
is_active=True,
)
db.add_all([user_a, user_b])
db.commit()
# Create token for user A
token_data = create_token_payload(
user_id="user-a",
email="usera@example.com",
role="manager",
department_id="dept-a",
is_system_admin=False,
)
token = create_access_token(token_data)
mock_redis.setex("session:user-a", 900, token)
# User A should only see users in dept-a
response = client.get(
"/api/users",
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 200
data = response.json()
# Should only contain user A (filtered by department)
emails = [u["email"] for u in data]
assert "usera@example.com" in emails
assert "userb@example.com" not in emails