import pytest from app.models.user import User from app.models.department import Department class TestUserEndpoints: """Test user management API endpoints.""" def test_list_users_as_admin(self, client, admin_token): """Test listing users as admin.""" response = client.get( "/api/users", headers={"Authorization": f"Bearer {admin_token}"}, ) assert response.status_code == 200 data = response.json() assert isinstance(data, list) assert len(data) >= 1 # At least admin user exists def test_get_user_by_id(self, client, admin_token): """Test getting a specific user.""" response = client.get( "/api/users/00000000-0000-0000-0000-000000000001", headers={"Authorization": f"Bearer {admin_token}"}, ) assert response.status_code == 200 data = response.json() assert data["email"] == "ymirliu@panjit.com.tw" def test_get_nonexistent_user(self, client, admin_token): """Test getting a user that doesn't exist.""" response = client.get( "/api/users/nonexistent-id", headers={"Authorization": f"Bearer {admin_token}"}, ) assert response.status_code == 404 def test_update_user(self, client, admin_token, db): """Test updating a user.""" # Create a test user test_user = User( id="test-user-001", email="test@example.com", name="Test User", is_active=True, ) db.add(test_user) db.commit() response = client.patch( "/api/users/test-user-001", headers={"Authorization": f"Bearer {admin_token}"}, json={"name": "Updated Name"}, ) assert response.status_code == 200 data = response.json() assert data["name"] == "Updated Name" def test_cannot_modify_system_admin_as_non_admin(self, client, db, mock_redis): """Test that non-admin cannot modify system admin.""" from app.core.security import create_access_token, create_token_payload # Create a non-admin user non_admin = User( id="non-admin-001", email="nonadmin@example.com", name="Non Admin", role_id="00000000-0000-0000-0000-000000000003", # engineer role is_active=True, is_system_admin=False, ) db.add(non_admin) db.commit() # Create token for non-admin token_data = create_token_payload( user_id="non-admin-001", email="nonadmin@example.com", role="engineer", department_id=None, is_system_admin=False, ) token = create_access_token(token_data) mock_redis.setex("session:non-admin-001", 900, token) # Try to modify system admin - should fail with 403 response = client.patch( "/api/users/00000000-0000-0000-0000-000000000001", headers={"Authorization": f"Bearer {token}"}, json={"name": "Hacked Name"}, ) # Engineer role doesn't have users.write permission assert response.status_code == 403 class TestCapacityUpdate: """Test user capacity update API endpoint.""" def test_update_own_capacity(self, client, db, mock_redis): """Test that a user can update their own capacity.""" from app.core.security import create_access_token, create_token_payload # Create a test user test_user = User( id="capacity-user-001", email="capacityuser@example.com", name="Capacity User", is_active=True, capacity=40.00, ) db.add(test_user) db.commit() # Create token for the user token_data = create_token_payload( user_id="capacity-user-001", email="capacityuser@example.com", role="engineer", department_id=None, is_system_admin=False, ) token = create_access_token(token_data) mock_redis.setex("session:capacity-user-001", 900, token) # Update own capacity response = client.put( "/api/users/capacity-user-001/capacity", headers={"Authorization": f"Bearer {token}"}, json={"capacity_hours": 35.5}, ) assert response.status_code == 200 data = response.json() assert float(data["capacity"]) == 35.5 def test_admin_can_update_other_user_capacity(self, client, admin_token, db): """Test that admin can update another user's capacity.""" # Create a test user test_user = User( id="capacity-user-002", email="capacityuser2@example.com", name="Capacity User 2", is_active=True, capacity=40.00, ) db.add(test_user) db.commit() # Admin updates another user's capacity response = client.put( "/api/users/capacity-user-002/capacity", headers={"Authorization": f"Bearer {admin_token}"}, json={"capacity_hours": 20.0}, ) assert response.status_code == 200 data = response.json() assert float(data["capacity"]) == 20.0 def test_non_admin_cannot_update_other_user_capacity(self, client, db, mock_redis): """Test that a non-admin user cannot update another user's capacity.""" from app.core.security import create_access_token, create_token_payload # Create two test users user1 = User( id="capacity-user-003", email="capacityuser3@example.com", name="Capacity User 3", is_active=True, capacity=40.00, ) user2 = User( id="capacity-user-004", email="capacityuser4@example.com", name="Capacity User 4", is_active=True, capacity=40.00, ) db.add_all([user1, user2]) db.commit() # Create token for user1 token_data = create_token_payload( user_id="capacity-user-003", email="capacityuser3@example.com", role="engineer", department_id=None, is_system_admin=False, ) token = create_access_token(token_data) mock_redis.setex("session:capacity-user-003", 900, token) # User1 tries to update user2's capacity - should fail response = client.put( "/api/users/capacity-user-004/capacity", headers={"Authorization": f"Bearer {token}"}, json={"capacity_hours": 30.0}, ) assert response.status_code == 403 assert "Only admin, manager, or the user themselves" in response.json()["detail"] def test_update_capacity_invalid_value_negative(self, client, admin_token, db): """Test that negative capacity hours are rejected.""" # Create a test user test_user = User( id="capacity-user-005", email="capacityuser5@example.com", name="Capacity User 5", is_active=True, capacity=40.00, ) db.add(test_user) db.commit() response = client.put( "/api/users/capacity-user-005/capacity", headers={"Authorization": f"Bearer {admin_token}"}, json={"capacity_hours": -5.0}, ) # Pydantic validation returns 422 Unprocessable Entity assert response.status_code == 422 error_detail = response.json()["detail"] # Check validation error message in Pydantic format assert any("non-negative" in str(err).lower() for err in error_detail) def test_update_capacity_invalid_value_too_high(self, client, admin_token, db): """Test that capacity hours exceeding 168 are rejected.""" # Create a test user test_user = User( id="capacity-user-006", email="capacityuser6@example.com", name="Capacity User 6", is_active=True, capacity=40.00, ) db.add(test_user) db.commit() response = client.put( "/api/users/capacity-user-006/capacity", headers={"Authorization": f"Bearer {admin_token}"}, json={"capacity_hours": 200.0}, ) # Pydantic validation returns 422 Unprocessable Entity assert response.status_code == 422 error_detail = response.json()["detail"] # Check validation error message in Pydantic format assert any("168" in str(err) for err in error_detail) def test_update_capacity_nonexistent_user(self, client, admin_token): """Test updating capacity for a nonexistent user.""" response = client.put( "/api/users/nonexistent-user-id/capacity", headers={"Authorization": f"Bearer {admin_token}"}, json={"capacity_hours": 40.0}, ) assert response.status_code == 404 assert "User not found" in response.json()["detail"] def test_manager_can_update_other_user_capacity(self, client, db, mock_redis): """Test that manager can update another user's capacity.""" from app.core.security import create_access_token, create_token_payload from app.models.role import Role # Create manager role if not exists manager_role = db.query(Role).filter(Role.name == "manager").first() if not manager_role: manager_role = Role( id="manager-role-cap", name="manager", permissions={"users.read": True, "users.write": True}, ) db.add(manager_role) db.commit() # Create a manager user manager_user = User( id="manager-cap-001", email="managercap@example.com", name="Manager Cap", role_id=manager_role.id, is_active=True, is_system_admin=False, ) # Create a regular user regular_user = User( id="regular-cap-001", email="regularcap@example.com", name="Regular Cap", is_active=True, capacity=40.00, ) db.add_all([manager_user, regular_user]) db.commit() # Create token for manager token_data = create_token_payload( user_id="manager-cap-001", email="managercap@example.com", role="manager", department_id=None, is_system_admin=False, ) token = create_access_token(token_data) mock_redis.setex("session:manager-cap-001", 900, token) # Manager updates regular user's capacity response = client.put( "/api/users/regular-cap-001/capacity", headers={"Authorization": f"Bearer {token}"}, json={"capacity_hours": 30.0}, ) assert response.status_code == 200 data = response.json() assert float(data["capacity"]) == 30.0 def test_capacity_change_creates_audit_log(self, client, admin_token, db): """Test that capacity changes are recorded in audit trail.""" from app.models import AuditLog # Create a test user test_user = User( id="capacity-audit-001", email="capacityaudit@example.com", name="Capacity Audit User", is_active=True, capacity=40.00, ) db.add(test_user) db.commit() # Update capacity response = client.put( "/api/users/capacity-audit-001/capacity", headers={"Authorization": f"Bearer {admin_token}"}, json={"capacity_hours": 35.0}, ) assert response.status_code == 200 # Check audit log was created audit_log = db.query(AuditLog).filter( AuditLog.resource_id == "capacity-audit-001", AuditLog.event_type == "user.capacity_change" ).first() assert audit_log is not None assert audit_log.resource_type == "user" assert audit_log.action == "update" assert len(audit_log.changes) == 1 assert audit_log.changes[0]["field"] == "capacity" assert audit_log.changes[0]["old_value"] == 40.0 assert audit_log.changes[0]["new_value"] == 35.0 class TestDepartmentIsolation: """Test department-based access control.""" def test_department_isolation(self, client, db, mock_redis): """Test that users can only see users in their department.""" from app.core.security import create_access_token, create_token_payload # Create departments dept_a = Department(id="dept-a", name="Department A") dept_b = Department(id="dept-b", name="Department B") db.add_all([dept_a, dept_b]) # Create manager role from app.models.role import Role manager_role = Role( id="manager-role", name="manager", permissions={"users.read": True, "users.write": True}, ) db.add(manager_role) # Create users in different departments user_a = User( id="user-a", email="usera@example.com", name="User A", department_id="dept-a", role_id="manager-role", is_active=True, ) user_b = User( id="user-b", email="userb@example.com", name="User B", department_id="dept-b", role_id="manager-role", is_active=True, ) db.add_all([user_a, user_b]) db.commit() # Create token for user A token_data = create_token_payload( user_id="user-a", email="usera@example.com", role="manager", department_id="dept-a", is_system_admin=False, ) token = create_access_token(token_data) mock_redis.setex("session:user-a", 900, token) # User A should only see users in dept-a response = client.get( "/api/users", headers={"Authorization": f"Bearer {token}"}, ) assert response.status_code == 200 data = response.json() # Should only contain user A (filtered by department) emails = [u["email"] for u in data] assert "usera@example.com" in emails assert "userb@example.com" not in emails