Files
PROJECT-CONTORL/backend/tests/test_workload_e2e.py
beabigegg 61fe01cb6b feat: implement workload heatmap module
- Backend (FastAPI):
  - Workload heatmap API with load level calculation
  - User workload detail endpoint with task breakdown
  - Redis caching for workload calculations (1hr TTL)
  - Department isolation and access control
  - WorkloadSnapshot model for historical data
  - Alembic migration for workload_snapshots table

- API Endpoints:
  - GET /api/workload/heatmap - Team workload overview
  - GET /api/workload/user/{id} - User workload detail
  - GET /api/workload/me - Current user workload

- Load Levels:
  - normal: <80%, warning: 80-99%, overloaded: >=100%

- Tests:
  - 26 unit/API tests
  - 15 E2E automated tests
  - 77 total tests passing

- OpenSpec:
  - add-resource-workload change archived
  - resource-management spec updated

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-29 01:13:21 +08:00

616 lines
23 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""End-to-end tests for workload API.
These tests verify the complete flow including:
- Database operations
- Redis caching
- Access control
- Load calculation accuracy
"""
import pytest
from datetime import datetime, timedelta, date
from decimal import Decimal
import json
from app.models import User, Department, Space, Project, Task
from app.models.task_status import TaskStatus
from app.core.security import create_access_token, create_token_payload
from app.services.workload_service import get_week_bounds
class TestWorkloadE2EHeatmap:
"""E2E tests for workload heatmap complete flow."""
def setup_complete_environment(self, db, mock_redis):
"""Set up a complete test environment with multiple users and tasks."""
import uuid
unique = str(uuid.uuid4())[:8]
# Create departments with unique IDs
dept_rd = Department(id=f"e2e-dept-rd-{unique}", name=f"R&D E2E {unique}")
dept_ops = Department(id=f"e2e-dept-ops-{unique}", name=f"Operations E2E {unique}")
db.add(dept_rd)
db.add(dept_ops)
# Create users with different capacities and unique IDs
users = {
"engineer1": User(
id=f"e2e-user-001-{unique}",
email=f"e2e-eng1-{unique}@test.com",
name="Engineer One",
department_id=f"e2e-dept-rd-{unique}",
role_id="00000000-0000-0000-0000-000000000003",
capacity=40,
is_active=True,
),
"engineer2": User(
id=f"e2e-user-002-{unique}",
email=f"e2e-eng2-{unique}@test.com",
name="Engineer Two",
department_id=f"e2e-dept-rd-{unique}",
role_id="00000000-0000-0000-0000-000000000003",
capacity=40,
is_active=True,
),
"ops_user": User(
id=f"e2e-user-003-{unique}",
email=f"e2e-ops-{unique}@test.com",
name="Ops Engineer",
department_id=f"e2e-dept-ops-{unique}",
role_id="00000000-0000-0000-0000-000000000003",
capacity=32,
is_active=True,
),
}
for user in users.values():
db.add(user)
# Create space and project with unique IDs
space = Space(
id=f"e2e-space-{unique}",
name=f"Test Space E2E {unique}",
owner_id="00000000-0000-0000-0000-000000000001",
is_active=True,
)
db.add(space)
project = Project(
id=f"e2e-project-{unique}",
space_id=f"e2e-space-{unique}",
title=f"Test Project E2E {unique}",
owner_id="00000000-0000-0000-0000-000000000001",
department_id=f"e2e-dept-rd-{unique}",
)
db.add(project)
# Create task statuses
status_todo = TaskStatus(
id=f"e2e-status-todo-{unique}",
project_id=f"e2e-project-{unique}",
name="To Do",
is_done=False,
)
status_done = TaskStatus(
id=f"e2e-status-done-{unique}",
project_id=f"e2e-project-{unique}",
name="Done",
is_done=True,
)
db.add(status_todo)
db.add(status_done)
db.commit()
# Calculate current week bounds
week_start, week_end = get_week_bounds(date.today())
# Create tasks for this week
tasks_data = [
# Engineer 1: 32 hours = 80% (warning)
(f"e2e-task-001-{unique}", f"e2e-user-001-{unique}", Decimal("16"), status_todo.id),
(f"e2e-task-002-{unique}", f"e2e-user-001-{unique}", Decimal("16"), status_todo.id),
# Engineer 2: 48 hours = 120% (overloaded)
(f"e2e-task-003-{unique}", f"e2e-user-002-{unique}", Decimal("24"), status_todo.id),
(f"e2e-task-004-{unique}", f"e2e-user-002-{unique}", Decimal("24"), status_todo.id),
# Ops user: 8 hours = 25% (normal, capacity is 32)
(f"e2e-task-005-{unique}", f"e2e-user-003-{unique}", Decimal("8"), status_todo.id),
# Completed task should not count
(f"e2e-task-006-{unique}", f"e2e-user-001-{unique}", Decimal("8"), status_done.id),
]
for task_id, assignee_id, estimate, status_id in tasks_data:
# Due date in the middle of current week
due_date = datetime.combine(week_start, datetime.min.time()) + timedelta(days=3)
task = Task(
id=task_id,
project_id=f"e2e-project-{unique}",
title=f"Task {task_id}",
assignee_id=assignee_id,
status_id=status_id,
original_estimate=estimate,
due_date=due_date,
created_by="00000000-0000-0000-0000-000000000001",
)
db.add(task)
db.commit()
return {
"users": users,
"week_start": week_start,
"week_end": week_end,
"unique": unique,
"dept_rd_id": f"e2e-dept-rd-{unique}",
"dept_ops_id": f"e2e-dept-ops-{unique}",
}
def test_heatmap_complete_flow_as_admin(self, client, db, admin_token, mock_redis):
"""Test complete heatmap flow as admin."""
data = self.setup_complete_environment(db, mock_redis)
unique = data["unique"]
# Filter by our specific test users to avoid interference from other tests
user_ids = f"e2e-user-001-{unique},e2e-user-002-{unique},e2e-user-003-{unique}"
response = client.get(
f"/api/workload/heatmap?user_ids={user_ids}",
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == 200
result = response.json()
# Verify response structure
assert "week_start" in result
assert "week_end" in result
assert "users" in result
# Find our E2E test users in the result
users_by_id = {u["user_id"]: u for u in result["users"]}
# Should have exactly 3 users
assert len(users_by_id) == 3
# E2E users should be present
assert f"e2e-user-001-{unique}" in users_by_id, f"e2e-user-001-{unique} not in {list(users_by_id.keys())}"
assert f"e2e-user-002-{unique}" in users_by_id
assert f"e2e-user-003-{unique}" in users_by_id
# Engineer 1: 32/40 = 80% = warning
eng1 = users_by_id[f"e2e-user-001-{unique}"]
assert Decimal(eng1["allocated_hours"]) == Decimal("32")
assert eng1["load_level"] == "warning"
# Engineer 2: 48/40 = 120% = overloaded
eng2 = users_by_id[f"e2e-user-002-{unique}"]
assert Decimal(eng2["allocated_hours"]) == Decimal("48")
assert eng2["load_level"] == "overloaded"
# Ops user: 8/32 = 25% = normal
ops = users_by_id[f"e2e-user-003-{unique}"]
assert Decimal(ops["allocated_hours"]) == Decimal("8")
assert ops["load_level"] == "normal"
def test_heatmap_department_filter(self, client, db, admin_token, mock_redis):
"""Test heatmap with department filter."""
data = self.setup_complete_environment(db, mock_redis)
unique = data["unique"]
dept_rd_id = data["dept_rd_id"]
response = client.get(
f"/api/workload/heatmap?department_id={dept_rd_id}",
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == 200
result = response.json()
# Should only include R&D users
for user in result["users"]:
assert user["department_id"] == dept_rd_id
# Should not include ops user
user_ids = {u["user_id"] for u in result["users"]}
assert f"e2e-user-003-{unique}" not in user_ids
def test_load_level_thresholds(self, client, db, admin_token, mock_redis):
"""Test that load levels are correctly determined."""
data = self.setup_complete_environment(db, mock_redis)
unique = data["unique"]
# Filter by our specific test users to avoid interference from other tests
user_ids = f"e2e-user-001-{unique},e2e-user-002-{unique},e2e-user-003-{unique}"
response = client.get(
f"/api/workload/heatmap?user_ids={user_ids}",
headers={"Authorization": f"Bearer {admin_token}"},
)
result = response.json()
users_by_id = {u["user_id"]: u for u in result["users"]}
# Check E2E users are present
assert f"e2e-user-001-{unique}" in users_by_id, f"e2e-user-001-{unique} not in {list(users_by_id.keys())}"
assert f"e2e-user-002-{unique}" in users_by_id
assert f"e2e-user-003-{unique}" in users_by_id
# Verify load levels based on percentage
# Engineer 1: 80% -> warning
assert users_by_id[f"e2e-user-001-{unique}"]["load_level"] == "warning"
assert Decimal(users_by_id[f"e2e-user-001-{unique}"]["load_percentage"]) == Decimal("80.00")
# Engineer 2: 120% -> overloaded
assert users_by_id[f"e2e-user-002-{unique}"]["load_level"] == "overloaded"
assert Decimal(users_by_id[f"e2e-user-002-{unique}"]["load_percentage"]) == Decimal("120.00")
# Ops user: 25% -> normal
assert users_by_id[f"e2e-user-003-{unique}"]["load_level"] == "normal"
assert Decimal(users_by_id[f"e2e-user-003-{unique}"]["load_percentage"]) == Decimal("25.00")
class TestWorkloadE2EAccessControl:
"""E2E tests for workload access control."""
def setup_multi_department_env(self, db, mock_redis):
"""Set up environment with multiple departments for access control tests."""
import uuid
unique = str(uuid.uuid4())[:8]
# Create departments with unique IDs
dept_rd = Department(id=f"acl-dept-rd-{unique}", name=f"R&D ACL {unique}")
dept_ops = Department(id=f"acl-dept-ops-{unique}", name=f"Operations ACL {unique}")
db.add(dept_rd)
db.add(dept_ops)
# Create users in different departments with unique IDs
rd_user = User(
id=f"acl-user-rd-{unique}",
email=f"acl-rd-{unique}@test.com",
name="R&D User ACL",
department_id=f"acl-dept-rd-{unique}",
role_id="00000000-0000-0000-0000-000000000003",
capacity=40,
is_active=True,
)
ops_user = User(
id=f"acl-user-ops-{unique}",
email=f"acl-ops-{unique}@test.com",
name="Ops User ACL",
department_id=f"acl-dept-ops-{unique}",
role_id="00000000-0000-0000-0000-000000000003",
capacity=40,
is_active=True,
)
db.add(rd_user)
db.add(ops_user)
db.commit()
# Create tokens with unique user IDs
rd_token_data = create_token_payload(
user_id=f"acl-user-rd-{unique}",
email=f"acl-rd-{unique}@test.com",
role="engineer",
department_id=f"acl-dept-rd-{unique}",
is_system_admin=False,
)
rd_token = create_access_token(rd_token_data)
mock_redis.setex(f"session:acl-user-rd-{unique}", 900, rd_token)
ops_token_data = create_token_payload(
user_id=f"acl-user-ops-{unique}",
email=f"acl-ops-{unique}@test.com",
role="engineer",
department_id=f"acl-dept-ops-{unique}",
is_system_admin=False,
)
ops_token = create_access_token(ops_token_data)
mock_redis.setex(f"session:acl-user-ops-{unique}", 900, ops_token)
return {
"rd_user": rd_user,
"ops_user": ops_user,
"rd_token": rd_token,
"ops_token": ops_token,
"unique": unique,
"rd_user_id": f"acl-user-rd-{unique}",
"ops_user_id": f"acl-user-ops-{unique}",
"dept_rd_id": f"acl-dept-rd-{unique}",
"dept_ops_id": f"acl-dept-ops-{unique}",
}
def test_admin_can_see_all_users(self, client, db, admin_token, mock_redis):
"""Super admin can see workload for all users."""
data = self.setup_multi_department_env(db, mock_redis)
# Filter by our specific test users to test admin access without interference
user_ids_filter = f"{data['rd_user_id']},{data['ops_user_id']}"
response = client.get(
f"/api/workload/heatmap?user_ids={user_ids_filter}",
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == 200
result = response.json()
# Admin sees users including ACL test users we just created
user_ids = {u["user_id"] for u in result["users"]}
assert len(user_ids) == 2
assert data["rd_user_id"] in user_ids, f"{data['rd_user_id']} not in {user_ids}"
assert data["ops_user_id"] in user_ids, f"{data['ops_user_id']} not in {user_ids}"
def test_regular_user_sees_only_self(self, client, db, mock_redis):
"""Regular user can only see their own workload in heatmap."""
data = self.setup_multi_department_env(db, mock_redis)
response = client.get(
"/api/workload/heatmap",
headers={"Authorization": f"Bearer {data['rd_token']}"},
)
assert response.status_code == 200
result = response.json()
# Should only see themselves
assert len(result["users"]) == 1
assert result["users"][0]["user_id"] == data["rd_user_id"]
def test_regular_user_cannot_access_other_user_detail(self, client, db, mock_redis):
"""Regular user cannot get another user's detailed workload."""
data = self.setup_multi_department_env(db, mock_redis)
# R&D user tries to access Ops user's detail
response = client.get(
f"/api/workload/user/{data['ops_user_id']}",
headers={"Authorization": f"Bearer {data['rd_token']}"},
)
assert response.status_code == 403
assert "Access denied" in response.json()["detail"]
def test_regular_user_cannot_access_other_department(self, client, db, mock_redis):
"""Regular user cannot filter by other department."""
data = self.setup_multi_department_env(db, mock_redis)
# R&D user tries to access Ops department
response = client.get(
f"/api/workload/heatmap?department_id={data['dept_ops_id']}",
headers={"Authorization": f"Bearer {data['rd_token']}"},
)
assert response.status_code == 403
assert "other departments" in response.json()["detail"]
def test_user_can_access_own_detail(self, client, db, mock_redis):
"""User can access their own detailed workload."""
data = self.setup_multi_department_env(db, mock_redis)
response = client.get(
f"/api/workload/user/{data['rd_user_id']}",
headers={"Authorization": f"Bearer {data['rd_token']}"},
)
assert response.status_code == 200
result = response.json()
assert result["user_id"] == data["rd_user_id"]
def test_my_workload_endpoint(self, client, db, mock_redis):
"""The /me endpoint returns current user's workload."""
data = self.setup_multi_department_env(db, mock_redis)
response = client.get(
"/api/workload/me",
headers={"Authorization": f"Bearer {data['rd_token']}"},
)
assert response.status_code == 200
result = response.json()
assert result["user_id"] == data["rd_user_id"]
assert "tasks" in result
class TestWorkloadE2ECache:
"""E2E tests for Redis cache behavior.
Note: The cache service imports redis_client directly from the module,
so these tests verify the cache logic through unit tests rather than
end-to-end integration. Full cache testing would require mocking at
the module level or refactoring to use dependency injection.
"""
def test_cache_functions_work(self):
"""Test that cache helper functions work correctly."""
from app.services.workload_cache import (
_make_heatmap_cache_key,
_make_user_cache_key,
_serialize_workload_summary,
_deserialize_workload_summary,
)
from app.schemas.workload import UserWorkloadSummary, LoadLevel
# Test cache key generation
week = date(2024, 1, 1)
key = _make_heatmap_cache_key(week)
assert "2024-01-01" in key
assert key == "workload:heatmap:2024-01-01"
key_with_dept = _make_heatmap_cache_key(week, department_id="dept-1")
assert "dept:dept-1" in key_with_dept
key_with_users = _make_heatmap_cache_key(week, user_ids=["user-1", "user-2"])
assert "users:user-1,user-2" in key_with_users
user_key = _make_user_cache_key("user-123", week)
assert user_key == "workload:user:user-123:2024-01-01"
def test_serialization_roundtrip(self):
"""Test that serialization/deserialization preserves data."""
from app.services.workload_cache import (
_serialize_workload_summary,
_deserialize_workload_summary,
)
from app.schemas.workload import UserWorkloadSummary, LoadLevel
original = UserWorkloadSummary(
user_id="user-123",
user_name="Test User",
department_id="dept-1",
department_name="R&D",
capacity_hours=Decimal("40"),
allocated_hours=Decimal("32.5"),
load_percentage=Decimal("81.25"),
load_level=LoadLevel.WARNING,
task_count=5,
)
serialized = _serialize_workload_summary(original)
deserialized = _deserialize_workload_summary(serialized)
assert deserialized.user_id == original.user_id
assert deserialized.user_name == original.user_name
assert deserialized.capacity_hours == original.capacity_hours
assert deserialized.allocated_hours == original.allocated_hours
assert deserialized.load_percentage == original.load_percentage
assert deserialized.load_level == original.load_level
assert deserialized.task_count == original.task_count
def test_second_request_returns_same_data(self, client, db, admin_token, mock_redis):
"""Second request should return identical data (testing idempotency)."""
# First request
response1 = client.get(
"/api/workload/heatmap",
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response1.status_code == 200
result1 = response1.json()
# Second request - should return same data
response2 = client.get(
"/api/workload/heatmap",
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response2.status_code == 200
result2 = response2.json()
# Results should be identical (whether from cache or recalculated)
assert result1 == result2
class TestWorkloadE2EUserDetail:
"""E2E tests for user workload detail endpoint."""
def setup_detail_test_env(self, db, mock_redis):
"""Set up environment for detail testing."""
import uuid
unique = str(uuid.uuid4())[:8]
# Create user with unique ID
user = User(
id=f"user-detail-{unique}",
email=f"detail-{unique}@test.com",
name="Detail Test User",
role_id="00000000-0000-0000-0000-000000000003",
capacity=40,
is_active=True,
)
db.add(user)
# Create space and project with unique IDs
space = Space(
id=f"space-detail-{unique}",
name=f"Detail Space {unique}",
owner_id="00000000-0000-0000-0000-000000000001",
is_active=True,
)
db.add(space)
project = Project(
id=f"project-detail-{unique}",
space_id=f"space-detail-{unique}",
title=f"Detail Project {unique}",
owner_id="00000000-0000-0000-0000-000000000001",
)
db.add(project)
status = TaskStatus(
id=f"status-detail-{unique}",
project_id=f"project-detail-{unique}",
name="In Progress",
is_done=False,
)
db.add(status)
db.commit()
# Create tasks
week_start = get_week_bounds(date.today())[0]
due = datetime.combine(week_start, datetime.min.time()) + timedelta(days=2)
tasks = []
for i in range(3):
task = Task(
id=f"task-detail-{i}-{unique}",
project_id=f"project-detail-{unique}",
title=f"Task {i}",
assignee_id=f"user-detail-{unique}",
status_id=f"status-detail-{unique}",
original_estimate=Decimal("8"),
due_date=due,
created_by="00000000-0000-0000-0000-000000000001",
)
db.add(task)
tasks.append(task)
db.commit()
return {
"user": user,
"tasks": tasks,
"project": project,
"unique": unique,
"user_id": f"user-detail-{unique}",
}
def test_detail_includes_task_list(self, client, db, admin_token, mock_redis):
"""User detail should include list of tasks."""
data = self.setup_detail_test_env(db, mock_redis)
response = client.get(
f"/api/workload/user/{data['user_id']}",
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == 200
result = response.json()
# Should include 3 tasks
assert len(result["tasks"]) == 3
# Each task should have required fields
for task in result["tasks"]:
assert "task_id" in task
assert "title" in task
assert "project_name" in task
assert "original_estimate" in task
def test_detail_calculates_total_correctly(self, client, db, admin_token, mock_redis):
"""Total allocated hours should sum task estimates."""
data = self.setup_detail_test_env(db, mock_redis)
response = client.get(
f"/api/workload/user/{data['user_id']}",
headers={"Authorization": f"Bearer {admin_token}"},
)
result = response.json()
# 3 tasks × 8 hours = 24 hours
assert Decimal(result["allocated_hours"]) == Decimal("24")
# 24/40 = 60%
assert Decimal(result["load_percentage"]) == Decimal("60.00")
assert result["load_level"] == "normal"
def test_nonexistent_user_returns_404(self, client, db, admin_token, mock_redis):
"""Requesting nonexistent user should return 404."""
response = client.get(
"/api/workload/user/nonexistent-user",
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == 404