feat: implement workload heatmap module
- Backend (FastAPI):
- Workload heatmap API with load level calculation
- User workload detail endpoint with task breakdown
- Redis caching for workload calculations (1hr TTL)
- Department isolation and access control
- WorkloadSnapshot model for historical data
- Alembic migration for workload_snapshots table
- API Endpoints:
- GET /api/workload/heatmap - Team workload overview
- GET /api/workload/user/{id} - User workload detail
- GET /api/workload/me - Current user workload
- Load Levels:
- normal: <80%, warning: 80-99%, overloaded: >=100%
- Tests:
- 26 unit/API tests
- 15 E2E automated tests
- 77 total tests passing
- OpenSpec:
- add-resource-workload change archived
- resource-management spec updated
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -29,6 +29,9 @@ class MockRedis:
|
||||
def get(self, key):
|
||||
return self.store.get(key)
|
||||
|
||||
def set(self, key, value):
|
||||
self.store[key] = value
|
||||
|
||||
def setex(self, key, seconds, value):
|
||||
self.store[key] = value
|
||||
|
||||
@@ -36,6 +39,17 @@ class MockRedis:
|
||||
if key in self.store:
|
||||
del self.store[key]
|
||||
|
||||
def scan_iter(self, match=None):
|
||||
"""Iterate over keys matching a pattern."""
|
||||
import fnmatch
|
||||
if match is None:
|
||||
yield from self.store.keys()
|
||||
else:
|
||||
pattern = match.replace("*", "**")
|
||||
for key in self.store.keys():
|
||||
if fnmatch.fnmatch(key, match):
|
||||
yield key
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def db():
|
||||
|
||||
537
backend/tests/test_workload.py
Normal file
537
backend/tests/test_workload.py
Normal file
@@ -0,0 +1,537 @@
|
||||
"""Tests for workload API and service."""
|
||||
import pytest
|
||||
from datetime import date, datetime, timedelta
|
||||
from decimal import Decimal
|
||||
|
||||
from app.models import User, Department, Space, Project, Task
|
||||
from app.models.task_status import TaskStatus
|
||||
from app.services.workload_service import (
|
||||
get_week_bounds,
|
||||
get_current_week_start,
|
||||
determine_load_level,
|
||||
calculate_load_percentage,
|
||||
calculate_user_workload,
|
||||
get_workload_heatmap,
|
||||
get_user_workload_detail,
|
||||
)
|
||||
from app.schemas.workload import LoadLevel
|
||||
|
||||
|
||||
class TestWeekBounds:
|
||||
"""Tests for week boundary calculations."""
|
||||
|
||||
def test_get_week_bounds_monday(self):
|
||||
"""Monday should return same day as week start."""
|
||||
monday = date(2024, 1, 1) # This is a Monday
|
||||
week_start, week_end = get_week_bounds(monday)
|
||||
assert week_start == monday
|
||||
assert week_end == date(2024, 1, 7)
|
||||
|
||||
def test_get_week_bounds_wednesday(self):
|
||||
"""Wednesday should return previous Monday as week start."""
|
||||
wednesday = date(2024, 1, 3)
|
||||
week_start, week_end = get_week_bounds(wednesday)
|
||||
assert week_start == date(2024, 1, 1)
|
||||
assert week_end == date(2024, 1, 7)
|
||||
|
||||
def test_get_week_bounds_sunday(self):
|
||||
"""Sunday should return previous Monday as week start."""
|
||||
sunday = date(2024, 1, 7)
|
||||
week_start, week_end = get_week_bounds(sunday)
|
||||
assert week_start == date(2024, 1, 1)
|
||||
assert week_end == date(2024, 1, 7)
|
||||
|
||||
def test_get_current_week_start(self):
|
||||
"""Current week start should be a Monday."""
|
||||
week_start = get_current_week_start()
|
||||
# Monday = 0
|
||||
assert week_start.weekday() == 0
|
||||
|
||||
|
||||
class TestLoadLevel:
|
||||
"""Tests for load level determination."""
|
||||
|
||||
def test_load_level_normal(self):
|
||||
"""Load below 80% should be normal."""
|
||||
assert determine_load_level(Decimal("0")) == LoadLevel.NORMAL
|
||||
assert determine_load_level(Decimal("50")) == LoadLevel.NORMAL
|
||||
assert determine_load_level(Decimal("79.99")) == LoadLevel.NORMAL
|
||||
|
||||
def test_load_level_warning(self):
|
||||
"""Load 80-99% should be warning."""
|
||||
assert determine_load_level(Decimal("80")) == LoadLevel.WARNING
|
||||
assert determine_load_level(Decimal("90")) == LoadLevel.WARNING
|
||||
assert determine_load_level(Decimal("99.99")) == LoadLevel.WARNING
|
||||
|
||||
def test_load_level_overloaded(self):
|
||||
"""Load 100%+ should be overloaded."""
|
||||
assert determine_load_level(Decimal("100")) == LoadLevel.OVERLOADED
|
||||
assert determine_load_level(Decimal("150")) == LoadLevel.OVERLOADED
|
||||
|
||||
def test_load_level_unavailable(self):
|
||||
"""None percentage should be unavailable."""
|
||||
assert determine_load_level(None) == LoadLevel.UNAVAILABLE
|
||||
|
||||
|
||||
class TestLoadPercentage:
|
||||
"""Tests for load percentage calculation."""
|
||||
|
||||
def test_normal_calculation(self):
|
||||
"""Normal calculation should work."""
|
||||
result = calculate_load_percentage(Decimal("32"), Decimal("40"))
|
||||
assert result == Decimal("80.00")
|
||||
|
||||
def test_zero_capacity(self):
|
||||
"""Zero capacity should return None."""
|
||||
result = calculate_load_percentage(Decimal("32"), Decimal("0"))
|
||||
assert result is None
|
||||
|
||||
def test_zero_allocated(self):
|
||||
"""Zero allocated should return 0."""
|
||||
result = calculate_load_percentage(Decimal("0"), Decimal("40"))
|
||||
assert result == Decimal("0.00")
|
||||
|
||||
|
||||
class TestWorkloadService:
|
||||
"""Tests for workload service with database."""
|
||||
|
||||
def setup_test_data(self, db):
|
||||
"""Set up test data for workload tests."""
|
||||
# Create department
|
||||
dept = Department(
|
||||
id="dept-001",
|
||||
name="R&D",
|
||||
)
|
||||
db.add(dept)
|
||||
|
||||
# Create engineer user
|
||||
engineer = User(
|
||||
id="user-engineer-001",
|
||||
email="engineer@test.com",
|
||||
name="Test Engineer",
|
||||
department_id="dept-001",
|
||||
role_id="00000000-0000-0000-0000-000000000003",
|
||||
capacity=40,
|
||||
is_active=True,
|
||||
is_system_admin=False,
|
||||
)
|
||||
db.add(engineer)
|
||||
|
||||
# Create space
|
||||
space = Space(
|
||||
id="space-001",
|
||||
name="Test Space",
|
||||
owner_id="00000000-0000-0000-0000-000000000001",
|
||||
is_active=True,
|
||||
)
|
||||
db.add(space)
|
||||
|
||||
# Create project
|
||||
project = Project(
|
||||
id="project-001",
|
||||
space_id="space-001",
|
||||
title="Test Project",
|
||||
owner_id="00000000-0000-0000-0000-000000000001",
|
||||
department_id="dept-001",
|
||||
security_level="department",
|
||||
)
|
||||
db.add(project)
|
||||
|
||||
# Create task status (not done)
|
||||
status_todo = TaskStatus(
|
||||
id="status-todo",
|
||||
project_id="project-001",
|
||||
name="To Do",
|
||||
is_done=False,
|
||||
)
|
||||
db.add(status_todo)
|
||||
|
||||
status_done = TaskStatus(
|
||||
id="status-done",
|
||||
project_id="project-001",
|
||||
name="Done",
|
||||
is_done=True,
|
||||
)
|
||||
db.add(status_done)
|
||||
|
||||
db.commit()
|
||||
|
||||
return {
|
||||
"department": dept,
|
||||
"engineer": engineer,
|
||||
"space": space,
|
||||
"project": project,
|
||||
"status_todo": status_todo,
|
||||
"status_done": status_done,
|
||||
}
|
||||
|
||||
def create_task(self, db, data, task_id, estimate, due_date, status_id=None, done=False):
|
||||
"""Helper to create a task."""
|
||||
task = Task(
|
||||
id=task_id,
|
||||
project_id=data["project"].id,
|
||||
title=f"Task {task_id}",
|
||||
assignee_id=data["engineer"].id,
|
||||
status_id=status_id or (data["status_done"].id if done else data["status_todo"].id),
|
||||
original_estimate=estimate,
|
||||
due_date=due_date,
|
||||
created_by="00000000-0000-0000-0000-000000000001",
|
||||
)
|
||||
db.add(task)
|
||||
db.commit()
|
||||
return task
|
||||
|
||||
def test_calculate_user_workload_empty(self, db):
|
||||
"""User with no tasks should have 0 allocated hours."""
|
||||
data = self.setup_test_data(db)
|
||||
|
||||
week_start = date(2024, 1, 1)
|
||||
summary = calculate_user_workload(db, data["engineer"], week_start)
|
||||
|
||||
assert summary.user_id == data["engineer"].id
|
||||
assert summary.allocated_hours == Decimal("0")
|
||||
assert summary.capacity_hours == Decimal("40")
|
||||
assert summary.load_percentage == Decimal("0.00")
|
||||
assert summary.load_level == LoadLevel.NORMAL
|
||||
assert summary.task_count == 0
|
||||
|
||||
def test_calculate_user_workload_with_tasks(self, db):
|
||||
"""User with tasks should have correct allocated hours."""
|
||||
data = self.setup_test_data(db)
|
||||
|
||||
# Create tasks due in the week of 2024-01-01
|
||||
week_start = date(2024, 1, 1)
|
||||
due = datetime(2024, 1, 3, 12, 0, 0) # Wednesday
|
||||
|
||||
self.create_task(db, data, "task-1", Decimal("8"), due)
|
||||
self.create_task(db, data, "task-2", Decimal("16"), due)
|
||||
|
||||
summary = calculate_user_workload(db, data["engineer"], week_start)
|
||||
|
||||
assert summary.allocated_hours == Decimal("24")
|
||||
assert summary.load_percentage == Decimal("60.00")
|
||||
assert summary.load_level == LoadLevel.NORMAL
|
||||
assert summary.task_count == 2
|
||||
|
||||
def test_calculate_user_workload_overloaded(self, db):
|
||||
"""User with too many tasks should be overloaded."""
|
||||
data = self.setup_test_data(db)
|
||||
|
||||
week_start = date(2024, 1, 1)
|
||||
due = datetime(2024, 1, 3, 12, 0, 0)
|
||||
|
||||
# 48 hours > 40 capacity = overloaded
|
||||
self.create_task(db, data, "task-1", Decimal("24"), due)
|
||||
self.create_task(db, data, "task-2", Decimal("24"), due)
|
||||
|
||||
summary = calculate_user_workload(db, data["engineer"], week_start)
|
||||
|
||||
assert summary.allocated_hours == Decimal("48")
|
||||
assert summary.load_percentage == Decimal("120.00")
|
||||
assert summary.load_level == LoadLevel.OVERLOADED
|
||||
|
||||
def test_completed_tasks_excluded(self, db):
|
||||
"""Completed tasks should not count toward workload."""
|
||||
data = self.setup_test_data(db)
|
||||
|
||||
week_start = date(2024, 1, 1)
|
||||
due = datetime(2024, 1, 3, 12, 0, 0)
|
||||
|
||||
self.create_task(db, data, "task-1", Decimal("8"), due, done=False)
|
||||
self.create_task(db, data, "task-2", Decimal("16"), due, done=True) # Done
|
||||
|
||||
summary = calculate_user_workload(db, data["engineer"], week_start)
|
||||
|
||||
assert summary.allocated_hours == Decimal("8") # Only uncompleted task
|
||||
assert summary.task_count == 1
|
||||
|
||||
def test_tasks_outside_week_excluded(self, db):
|
||||
"""Tasks due outside the week should not count."""
|
||||
data = self.setup_test_data(db)
|
||||
|
||||
week_start = date(2024, 1, 1)
|
||||
|
||||
# Task due in this week
|
||||
self.create_task(db, data, "task-1", Decimal("8"), datetime(2024, 1, 3, 12, 0, 0))
|
||||
# Task due next week
|
||||
self.create_task(db, data, "task-2", Decimal("16"), datetime(2024, 1, 10, 12, 0, 0))
|
||||
|
||||
summary = calculate_user_workload(db, data["engineer"], week_start)
|
||||
|
||||
assert summary.allocated_hours == Decimal("8") # Only this week's task
|
||||
assert summary.task_count == 1
|
||||
|
||||
def test_get_workload_heatmap(self, db):
|
||||
"""Heatmap should return all matching users."""
|
||||
data = self.setup_test_data(db)
|
||||
|
||||
week_start = date(2024, 1, 1)
|
||||
due = datetime(2024, 1, 3, 12, 0, 0)
|
||||
|
||||
self.create_task(db, data, "task-1", Decimal("32"), due)
|
||||
|
||||
# Get heatmap for the department
|
||||
summaries = get_workload_heatmap(
|
||||
db=db,
|
||||
week_start=week_start,
|
||||
department_id="dept-001",
|
||||
)
|
||||
|
||||
# Should include engineer (not admin, admin has no department)
|
||||
assert len(summaries) == 1
|
||||
assert summaries[0].user_id == data["engineer"].id
|
||||
assert summaries[0].load_level == LoadLevel.WARNING # 80%
|
||||
|
||||
def test_get_user_workload_detail(self, db):
|
||||
"""Detail should include task list."""
|
||||
data = self.setup_test_data(db)
|
||||
|
||||
week_start = date(2024, 1, 1)
|
||||
due = datetime(2024, 1, 3, 12, 0, 0)
|
||||
|
||||
self.create_task(db, data, "task-1", Decimal("8"), due)
|
||||
self.create_task(db, data, "task-2", Decimal("16"), due)
|
||||
|
||||
detail = get_user_workload_detail(db, data["engineer"].id, week_start)
|
||||
|
||||
assert detail is not None
|
||||
assert detail.user_id == data["engineer"].id
|
||||
assert len(detail.tasks) == 2
|
||||
assert detail.allocated_hours == Decimal("24")
|
||||
|
||||
|
||||
class TestWorkloadAPI:
|
||||
"""Tests for workload API endpoints."""
|
||||
|
||||
def setup_test_data(self, db):
|
||||
"""Set up test data for API tests."""
|
||||
# Create department
|
||||
dept = Department(
|
||||
id="dept-001",
|
||||
name="R&D",
|
||||
)
|
||||
db.add(dept)
|
||||
|
||||
# Create engineer user
|
||||
engineer = User(
|
||||
id="user-engineer-001",
|
||||
email="engineer@test.com",
|
||||
name="Test Engineer",
|
||||
department_id="dept-001",
|
||||
role_id="00000000-0000-0000-0000-000000000003",
|
||||
capacity=40,
|
||||
is_active=True,
|
||||
is_system_admin=False,
|
||||
)
|
||||
db.add(engineer)
|
||||
|
||||
# Create space
|
||||
space = Space(
|
||||
id="space-001",
|
||||
name="Test Space",
|
||||
owner_id="00000000-0000-0000-0000-000000000001",
|
||||
is_active=True,
|
||||
)
|
||||
db.add(space)
|
||||
|
||||
# Create project
|
||||
project = Project(
|
||||
id="project-001",
|
||||
space_id="space-001",
|
||||
title="Test Project",
|
||||
owner_id="00000000-0000-0000-0000-000000000001",
|
||||
department_id="dept-001",
|
||||
security_level="department",
|
||||
)
|
||||
db.add(project)
|
||||
|
||||
# Create task status
|
||||
status_todo = TaskStatus(
|
||||
id="status-todo",
|
||||
project_id="project-001",
|
||||
name="To Do",
|
||||
is_done=False,
|
||||
)
|
||||
db.add(status_todo)
|
||||
|
||||
# Create a task due this week
|
||||
task = Task(
|
||||
id="task-001",
|
||||
project_id="project-001",
|
||||
title="Test Task",
|
||||
assignee_id="user-engineer-001",
|
||||
status_id="status-todo",
|
||||
original_estimate=Decimal("32"),
|
||||
due_date=datetime.now() + timedelta(days=1),
|
||||
created_by="00000000-0000-0000-0000-000000000001",
|
||||
)
|
||||
db.add(task)
|
||||
|
||||
db.commit()
|
||||
|
||||
return {
|
||||
"department": dept,
|
||||
"engineer": engineer,
|
||||
}
|
||||
|
||||
def test_heatmap_as_admin(self, client, db, admin_token):
|
||||
"""Admin should see all users in heatmap."""
|
||||
data = self.setup_test_data(db)
|
||||
|
||||
response = client.get(
|
||||
"/api/workload/heatmap",
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
result = response.json()
|
||||
assert "week_start" in result
|
||||
assert "week_end" in result
|
||||
assert "users" in result
|
||||
# Admin sees all users including the engineer
|
||||
assert len(result["users"]) >= 1
|
||||
|
||||
def test_heatmap_with_department_filter(self, client, db, admin_token):
|
||||
"""Admin can filter by department."""
|
||||
data = self.setup_test_data(db)
|
||||
|
||||
response = client.get(
|
||||
"/api/workload/heatmap?department_id=dept-001",
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
result = response.json()
|
||||
# Should only include users in dept-001
|
||||
for user in result["users"]:
|
||||
assert user["department_id"] == "dept-001"
|
||||
|
||||
def test_my_workload(self, client, db, admin_token):
|
||||
"""User can get their own workload."""
|
||||
response = client.get(
|
||||
"/api/workload/me",
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
result = response.json()
|
||||
assert result["user_id"] == "00000000-0000-0000-0000-000000000001"
|
||||
assert "tasks" in result
|
||||
|
||||
def test_user_workload_detail(self, client, db, admin_token):
|
||||
"""Admin can get any user's workload detail."""
|
||||
data = self.setup_test_data(db)
|
||||
|
||||
response = client.get(
|
||||
f"/api/workload/user/{data['engineer'].id}",
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
result = response.json()
|
||||
assert result["user_id"] == data["engineer"].id
|
||||
assert len(result["tasks"]) == 1
|
||||
assert result["allocated_hours"] == "32.00" # Decimal comes as string with precision
|
||||
|
||||
def test_unauthorized_access(self, client, db):
|
||||
"""Unauthenticated requests should fail."""
|
||||
response = client.get("/api/workload/heatmap")
|
||||
assert response.status_code == 403 # No auth header
|
||||
|
||||
|
||||
class TestWorkloadAccessControl:
|
||||
"""Tests for workload access control."""
|
||||
|
||||
def setup_test_data(self, db, mock_redis):
|
||||
"""Set up test data with two departments."""
|
||||
from app.core.security import create_access_token, create_token_payload
|
||||
|
||||
# Create departments
|
||||
dept_rd = Department(id="dept-rd", name="R&D")
|
||||
dept_ops = Department(id="dept-ops", name="Operations")
|
||||
db.add(dept_rd)
|
||||
db.add(dept_ops)
|
||||
|
||||
# Create engineer in R&D
|
||||
engineer_rd = User(
|
||||
id="user-rd-001",
|
||||
email="rd@test.com",
|
||||
name="R&D Engineer",
|
||||
department_id="dept-rd",
|
||||
role_id="00000000-0000-0000-0000-000000000003",
|
||||
capacity=40,
|
||||
is_active=True,
|
||||
is_system_admin=False,
|
||||
)
|
||||
db.add(engineer_rd)
|
||||
|
||||
# Create engineer in Operations
|
||||
engineer_ops = User(
|
||||
id="user-ops-001",
|
||||
email="ops@test.com",
|
||||
name="Ops Engineer",
|
||||
department_id="dept-ops",
|
||||
role_id="00000000-0000-0000-0000-000000000003",
|
||||
capacity=40,
|
||||
is_active=True,
|
||||
is_system_admin=False,
|
||||
)
|
||||
db.add(engineer_ops)
|
||||
|
||||
db.commit()
|
||||
|
||||
# Create token for R&D engineer
|
||||
token_data = create_token_payload(
|
||||
user_id="user-rd-001",
|
||||
email="rd@test.com",
|
||||
role="engineer",
|
||||
department_id="dept-rd",
|
||||
is_system_admin=False,
|
||||
)
|
||||
rd_token = create_access_token(token_data)
|
||||
mock_redis.setex("session:user-rd-001", 900, rd_token)
|
||||
|
||||
return {
|
||||
"dept_rd": dept_rd,
|
||||
"dept_ops": dept_ops,
|
||||
"engineer_rd": engineer_rd,
|
||||
"engineer_ops": engineer_ops,
|
||||
"rd_token": rd_token,
|
||||
}
|
||||
|
||||
def test_regular_user_sees_only_self(self, client, db, mock_redis):
|
||||
"""Regular user should only see their own workload."""
|
||||
data = self.setup_test_data(db, mock_redis)
|
||||
|
||||
response = client.get(
|
||||
"/api/workload/heatmap",
|
||||
headers={"Authorization": f"Bearer {data['rd_token']}"},
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
result = response.json()
|
||||
# Should only see themselves
|
||||
assert len(result["users"]) == 1
|
||||
assert result["users"][0]["user_id"] == "user-rd-001"
|
||||
|
||||
def test_regular_user_cannot_access_other_department(self, client, db, mock_redis):
|
||||
"""Regular user should not access other department's workload."""
|
||||
data = self.setup_test_data(db, mock_redis)
|
||||
|
||||
response = client.get(
|
||||
"/api/workload/heatmap?department_id=dept-ops",
|
||||
headers={"Authorization": f"Bearer {data['rd_token']}"},
|
||||
)
|
||||
|
||||
assert response.status_code == 403
|
||||
|
||||
def test_regular_user_cannot_access_other_user_detail(self, client, db, mock_redis):
|
||||
"""Regular user should not access other user's detail."""
|
||||
data = self.setup_test_data(db, mock_redis)
|
||||
|
||||
response = client.get(
|
||||
f"/api/workload/user/{data['engineer_ops'].id}",
|
||||
headers={"Authorization": f"Bearer {data['rd_token']}"},
|
||||
)
|
||||
|
||||
assert response.status_code == 403
|
||||
615
backend/tests/test_workload_e2e.py
Normal file
615
backend/tests/test_workload_e2e.py
Normal file
@@ -0,0 +1,615 @@
|
||||
"""End-to-end tests for workload API.
|
||||
|
||||
These tests verify the complete flow including:
|
||||
- Database operations
|
||||
- Redis caching
|
||||
- Access control
|
||||
- Load calculation accuracy
|
||||
"""
|
||||
import pytest
|
||||
from datetime import datetime, timedelta, date
|
||||
from decimal import Decimal
|
||||
import json
|
||||
|
||||
from app.models import User, Department, Space, Project, Task
|
||||
from app.models.task_status import TaskStatus
|
||||
from app.core.security import create_access_token, create_token_payload
|
||||
from app.services.workload_service import get_week_bounds
|
||||
|
||||
|
||||
class TestWorkloadE2EHeatmap:
|
||||
"""E2E tests for workload heatmap complete flow."""
|
||||
|
||||
def setup_complete_environment(self, db, mock_redis):
|
||||
"""Set up a complete test environment with multiple users and tasks."""
|
||||
import uuid
|
||||
unique = str(uuid.uuid4())[:8]
|
||||
|
||||
# Create departments with unique IDs
|
||||
dept_rd = Department(id=f"e2e-dept-rd-{unique}", name=f"R&D E2E {unique}")
|
||||
dept_ops = Department(id=f"e2e-dept-ops-{unique}", name=f"Operations E2E {unique}")
|
||||
db.add(dept_rd)
|
||||
db.add(dept_ops)
|
||||
|
||||
# Create users with different capacities and unique IDs
|
||||
users = {
|
||||
"engineer1": User(
|
||||
id=f"e2e-user-001-{unique}",
|
||||
email=f"e2e-eng1-{unique}@test.com",
|
||||
name="Engineer One",
|
||||
department_id=f"e2e-dept-rd-{unique}",
|
||||
role_id="00000000-0000-0000-0000-000000000003",
|
||||
capacity=40,
|
||||
is_active=True,
|
||||
),
|
||||
"engineer2": User(
|
||||
id=f"e2e-user-002-{unique}",
|
||||
email=f"e2e-eng2-{unique}@test.com",
|
||||
name="Engineer Two",
|
||||
department_id=f"e2e-dept-rd-{unique}",
|
||||
role_id="00000000-0000-0000-0000-000000000003",
|
||||
capacity=40,
|
||||
is_active=True,
|
||||
),
|
||||
"ops_user": User(
|
||||
id=f"e2e-user-003-{unique}",
|
||||
email=f"e2e-ops-{unique}@test.com",
|
||||
name="Ops Engineer",
|
||||
department_id=f"e2e-dept-ops-{unique}",
|
||||
role_id="00000000-0000-0000-0000-000000000003",
|
||||
capacity=32,
|
||||
is_active=True,
|
||||
),
|
||||
}
|
||||
for user in users.values():
|
||||
db.add(user)
|
||||
|
||||
# Create space and project with unique IDs
|
||||
space = Space(
|
||||
id=f"e2e-space-{unique}",
|
||||
name=f"Test Space E2E {unique}",
|
||||
owner_id="00000000-0000-0000-0000-000000000001",
|
||||
is_active=True,
|
||||
)
|
||||
db.add(space)
|
||||
|
||||
project = Project(
|
||||
id=f"e2e-project-{unique}",
|
||||
space_id=f"e2e-space-{unique}",
|
||||
title=f"Test Project E2E {unique}",
|
||||
owner_id="00000000-0000-0000-0000-000000000001",
|
||||
department_id=f"e2e-dept-rd-{unique}",
|
||||
)
|
||||
db.add(project)
|
||||
|
||||
# Create task statuses
|
||||
status_todo = TaskStatus(
|
||||
id=f"e2e-status-todo-{unique}",
|
||||
project_id=f"e2e-project-{unique}",
|
||||
name="To Do",
|
||||
is_done=False,
|
||||
)
|
||||
status_done = TaskStatus(
|
||||
id=f"e2e-status-done-{unique}",
|
||||
project_id=f"e2e-project-{unique}",
|
||||
name="Done",
|
||||
is_done=True,
|
||||
)
|
||||
db.add(status_todo)
|
||||
db.add(status_done)
|
||||
|
||||
db.commit()
|
||||
|
||||
# Calculate current week bounds
|
||||
week_start, week_end = get_week_bounds(date.today())
|
||||
# Create tasks for this week
|
||||
tasks_data = [
|
||||
# Engineer 1: 32 hours = 80% (warning)
|
||||
(f"e2e-task-001-{unique}", f"e2e-user-001-{unique}", Decimal("16"), status_todo.id),
|
||||
(f"e2e-task-002-{unique}", f"e2e-user-001-{unique}", Decimal("16"), status_todo.id),
|
||||
# Engineer 2: 48 hours = 120% (overloaded)
|
||||
(f"e2e-task-003-{unique}", f"e2e-user-002-{unique}", Decimal("24"), status_todo.id),
|
||||
(f"e2e-task-004-{unique}", f"e2e-user-002-{unique}", Decimal("24"), status_todo.id),
|
||||
# Ops user: 8 hours = 25% (normal, capacity is 32)
|
||||
(f"e2e-task-005-{unique}", f"e2e-user-003-{unique}", Decimal("8"), status_todo.id),
|
||||
# Completed task should not count
|
||||
(f"e2e-task-006-{unique}", f"e2e-user-001-{unique}", Decimal("8"), status_done.id),
|
||||
]
|
||||
|
||||
for task_id, assignee_id, estimate, status_id in tasks_data:
|
||||
# Due date in the middle of current week
|
||||
due_date = datetime.combine(week_start, datetime.min.time()) + timedelta(days=3)
|
||||
task = Task(
|
||||
id=task_id,
|
||||
project_id=f"e2e-project-{unique}",
|
||||
title=f"Task {task_id}",
|
||||
assignee_id=assignee_id,
|
||||
status_id=status_id,
|
||||
original_estimate=estimate,
|
||||
due_date=due_date,
|
||||
created_by="00000000-0000-0000-0000-000000000001",
|
||||
)
|
||||
db.add(task)
|
||||
|
||||
db.commit()
|
||||
|
||||
return {
|
||||
"users": users,
|
||||
"week_start": week_start,
|
||||
"week_end": week_end,
|
||||
"unique": unique,
|
||||
"dept_rd_id": f"e2e-dept-rd-{unique}",
|
||||
"dept_ops_id": f"e2e-dept-ops-{unique}",
|
||||
}
|
||||
|
||||
def test_heatmap_complete_flow_as_admin(self, client, db, admin_token, mock_redis):
|
||||
"""Test complete heatmap flow as admin."""
|
||||
data = self.setup_complete_environment(db, mock_redis)
|
||||
unique = data["unique"]
|
||||
|
||||
# Filter by our specific test users to avoid interference from other tests
|
||||
user_ids = f"e2e-user-001-{unique},e2e-user-002-{unique},e2e-user-003-{unique}"
|
||||
response = client.get(
|
||||
f"/api/workload/heatmap?user_ids={user_ids}",
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
result = response.json()
|
||||
|
||||
# Verify response structure
|
||||
assert "week_start" in result
|
||||
assert "week_end" in result
|
||||
assert "users" in result
|
||||
|
||||
# Find our E2E test users in the result
|
||||
users_by_id = {u["user_id"]: u for u in result["users"]}
|
||||
|
||||
# Should have exactly 3 users
|
||||
assert len(users_by_id) == 3
|
||||
|
||||
# E2E users should be present
|
||||
assert f"e2e-user-001-{unique}" in users_by_id, f"e2e-user-001-{unique} not in {list(users_by_id.keys())}"
|
||||
assert f"e2e-user-002-{unique}" in users_by_id
|
||||
assert f"e2e-user-003-{unique}" in users_by_id
|
||||
|
||||
# Engineer 1: 32/40 = 80% = warning
|
||||
eng1 = users_by_id[f"e2e-user-001-{unique}"]
|
||||
assert Decimal(eng1["allocated_hours"]) == Decimal("32")
|
||||
assert eng1["load_level"] == "warning"
|
||||
|
||||
# Engineer 2: 48/40 = 120% = overloaded
|
||||
eng2 = users_by_id[f"e2e-user-002-{unique}"]
|
||||
assert Decimal(eng2["allocated_hours"]) == Decimal("48")
|
||||
assert eng2["load_level"] == "overloaded"
|
||||
|
||||
# Ops user: 8/32 = 25% = normal
|
||||
ops = users_by_id[f"e2e-user-003-{unique}"]
|
||||
assert Decimal(ops["allocated_hours"]) == Decimal("8")
|
||||
assert ops["load_level"] == "normal"
|
||||
|
||||
def test_heatmap_department_filter(self, client, db, admin_token, mock_redis):
|
||||
"""Test heatmap with department filter."""
|
||||
data = self.setup_complete_environment(db, mock_redis)
|
||||
unique = data["unique"]
|
||||
dept_rd_id = data["dept_rd_id"]
|
||||
|
||||
response = client.get(
|
||||
f"/api/workload/heatmap?department_id={dept_rd_id}",
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
result = response.json()
|
||||
|
||||
# Should only include R&D users
|
||||
for user in result["users"]:
|
||||
assert user["department_id"] == dept_rd_id
|
||||
|
||||
# Should not include ops user
|
||||
user_ids = {u["user_id"] for u in result["users"]}
|
||||
assert f"e2e-user-003-{unique}" not in user_ids
|
||||
|
||||
def test_load_level_thresholds(self, client, db, admin_token, mock_redis):
|
||||
"""Test that load levels are correctly determined."""
|
||||
data = self.setup_complete_environment(db, mock_redis)
|
||||
unique = data["unique"]
|
||||
|
||||
# Filter by our specific test users to avoid interference from other tests
|
||||
user_ids = f"e2e-user-001-{unique},e2e-user-002-{unique},e2e-user-003-{unique}"
|
||||
response = client.get(
|
||||
f"/api/workload/heatmap?user_ids={user_ids}",
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
|
||||
result = response.json()
|
||||
users_by_id = {u["user_id"]: u for u in result["users"]}
|
||||
|
||||
# Check E2E users are present
|
||||
assert f"e2e-user-001-{unique}" in users_by_id, f"e2e-user-001-{unique} not in {list(users_by_id.keys())}"
|
||||
assert f"e2e-user-002-{unique}" in users_by_id
|
||||
assert f"e2e-user-003-{unique}" in users_by_id
|
||||
|
||||
# Verify load levels based on percentage
|
||||
# Engineer 1: 80% -> warning
|
||||
assert users_by_id[f"e2e-user-001-{unique}"]["load_level"] == "warning"
|
||||
assert Decimal(users_by_id[f"e2e-user-001-{unique}"]["load_percentage"]) == Decimal("80.00")
|
||||
|
||||
# Engineer 2: 120% -> overloaded
|
||||
assert users_by_id[f"e2e-user-002-{unique}"]["load_level"] == "overloaded"
|
||||
assert Decimal(users_by_id[f"e2e-user-002-{unique}"]["load_percentage"]) == Decimal("120.00")
|
||||
|
||||
# Ops user: 25% -> normal
|
||||
assert users_by_id[f"e2e-user-003-{unique}"]["load_level"] == "normal"
|
||||
assert Decimal(users_by_id[f"e2e-user-003-{unique}"]["load_percentage"]) == Decimal("25.00")
|
||||
|
||||
|
||||
class TestWorkloadE2EAccessControl:
|
||||
"""E2E tests for workload access control."""
|
||||
|
||||
def setup_multi_department_env(self, db, mock_redis):
|
||||
"""Set up environment with multiple departments for access control tests."""
|
||||
import uuid
|
||||
unique = str(uuid.uuid4())[:8]
|
||||
|
||||
# Create departments with unique IDs
|
||||
dept_rd = Department(id=f"acl-dept-rd-{unique}", name=f"R&D ACL {unique}")
|
||||
dept_ops = Department(id=f"acl-dept-ops-{unique}", name=f"Operations ACL {unique}")
|
||||
db.add(dept_rd)
|
||||
db.add(dept_ops)
|
||||
|
||||
# Create users in different departments with unique IDs
|
||||
rd_user = User(
|
||||
id=f"acl-user-rd-{unique}",
|
||||
email=f"acl-rd-{unique}@test.com",
|
||||
name="R&D User ACL",
|
||||
department_id=f"acl-dept-rd-{unique}",
|
||||
role_id="00000000-0000-0000-0000-000000000003",
|
||||
capacity=40,
|
||||
is_active=True,
|
||||
)
|
||||
ops_user = User(
|
||||
id=f"acl-user-ops-{unique}",
|
||||
email=f"acl-ops-{unique}@test.com",
|
||||
name="Ops User ACL",
|
||||
department_id=f"acl-dept-ops-{unique}",
|
||||
role_id="00000000-0000-0000-0000-000000000003",
|
||||
capacity=40,
|
||||
is_active=True,
|
||||
)
|
||||
db.add(rd_user)
|
||||
db.add(ops_user)
|
||||
db.commit()
|
||||
|
||||
# Create tokens with unique user IDs
|
||||
rd_token_data = create_token_payload(
|
||||
user_id=f"acl-user-rd-{unique}",
|
||||
email=f"acl-rd-{unique}@test.com",
|
||||
role="engineer",
|
||||
department_id=f"acl-dept-rd-{unique}",
|
||||
is_system_admin=False,
|
||||
)
|
||||
rd_token = create_access_token(rd_token_data)
|
||||
mock_redis.setex(f"session:acl-user-rd-{unique}", 900, rd_token)
|
||||
|
||||
ops_token_data = create_token_payload(
|
||||
user_id=f"acl-user-ops-{unique}",
|
||||
email=f"acl-ops-{unique}@test.com",
|
||||
role="engineer",
|
||||
department_id=f"acl-dept-ops-{unique}",
|
||||
is_system_admin=False,
|
||||
)
|
||||
ops_token = create_access_token(ops_token_data)
|
||||
mock_redis.setex(f"session:acl-user-ops-{unique}", 900, ops_token)
|
||||
|
||||
return {
|
||||
"rd_user": rd_user,
|
||||
"ops_user": ops_user,
|
||||
"rd_token": rd_token,
|
||||
"ops_token": ops_token,
|
||||
"unique": unique,
|
||||
"rd_user_id": f"acl-user-rd-{unique}",
|
||||
"ops_user_id": f"acl-user-ops-{unique}",
|
||||
"dept_rd_id": f"acl-dept-rd-{unique}",
|
||||
"dept_ops_id": f"acl-dept-ops-{unique}",
|
||||
}
|
||||
|
||||
def test_admin_can_see_all_users(self, client, db, admin_token, mock_redis):
|
||||
"""Super admin can see workload for all users."""
|
||||
data = self.setup_multi_department_env(db, mock_redis)
|
||||
|
||||
# Filter by our specific test users to test admin access without interference
|
||||
user_ids_filter = f"{data['rd_user_id']},{data['ops_user_id']}"
|
||||
response = client.get(
|
||||
f"/api/workload/heatmap?user_ids={user_ids_filter}",
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
result = response.json()
|
||||
|
||||
# Admin sees users including ACL test users we just created
|
||||
user_ids = {u["user_id"] for u in result["users"]}
|
||||
assert len(user_ids) == 2
|
||||
assert data["rd_user_id"] in user_ids, f"{data['rd_user_id']} not in {user_ids}"
|
||||
assert data["ops_user_id"] in user_ids, f"{data['ops_user_id']} not in {user_ids}"
|
||||
|
||||
def test_regular_user_sees_only_self(self, client, db, mock_redis):
|
||||
"""Regular user can only see their own workload in heatmap."""
|
||||
data = self.setup_multi_department_env(db, mock_redis)
|
||||
|
||||
response = client.get(
|
||||
"/api/workload/heatmap",
|
||||
headers={"Authorization": f"Bearer {data['rd_token']}"},
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
result = response.json()
|
||||
|
||||
# Should only see themselves
|
||||
assert len(result["users"]) == 1
|
||||
assert result["users"][0]["user_id"] == data["rd_user_id"]
|
||||
|
||||
def test_regular_user_cannot_access_other_user_detail(self, client, db, mock_redis):
|
||||
"""Regular user cannot get another user's detailed workload."""
|
||||
data = self.setup_multi_department_env(db, mock_redis)
|
||||
|
||||
# R&D user tries to access Ops user's detail
|
||||
response = client.get(
|
||||
f"/api/workload/user/{data['ops_user_id']}",
|
||||
headers={"Authorization": f"Bearer {data['rd_token']}"},
|
||||
)
|
||||
|
||||
assert response.status_code == 403
|
||||
assert "Access denied" in response.json()["detail"]
|
||||
|
||||
def test_regular_user_cannot_access_other_department(self, client, db, mock_redis):
|
||||
"""Regular user cannot filter by other department."""
|
||||
data = self.setup_multi_department_env(db, mock_redis)
|
||||
|
||||
# R&D user tries to access Ops department
|
||||
response = client.get(
|
||||
f"/api/workload/heatmap?department_id={data['dept_ops_id']}",
|
||||
headers={"Authorization": f"Bearer {data['rd_token']}"},
|
||||
)
|
||||
|
||||
assert response.status_code == 403
|
||||
assert "other departments" in response.json()["detail"]
|
||||
|
||||
def test_user_can_access_own_detail(self, client, db, mock_redis):
|
||||
"""User can access their own detailed workload."""
|
||||
data = self.setup_multi_department_env(db, mock_redis)
|
||||
|
||||
response = client.get(
|
||||
f"/api/workload/user/{data['rd_user_id']}",
|
||||
headers={"Authorization": f"Bearer {data['rd_token']}"},
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
result = response.json()
|
||||
assert result["user_id"] == data["rd_user_id"]
|
||||
|
||||
def test_my_workload_endpoint(self, client, db, mock_redis):
|
||||
"""The /me endpoint returns current user's workload."""
|
||||
data = self.setup_multi_department_env(db, mock_redis)
|
||||
|
||||
response = client.get(
|
||||
"/api/workload/me",
|
||||
headers={"Authorization": f"Bearer {data['rd_token']}"},
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
result = response.json()
|
||||
assert result["user_id"] == data["rd_user_id"]
|
||||
assert "tasks" in result
|
||||
|
||||
|
||||
class TestWorkloadE2ECache:
|
||||
"""E2E tests for Redis cache behavior.
|
||||
|
||||
Note: The cache service imports redis_client directly from the module,
|
||||
so these tests verify the cache logic through unit tests rather than
|
||||
end-to-end integration. Full cache testing would require mocking at
|
||||
the module level or refactoring to use dependency injection.
|
||||
"""
|
||||
|
||||
def test_cache_functions_work(self):
|
||||
"""Test that cache helper functions work correctly."""
|
||||
from app.services.workload_cache import (
|
||||
_make_heatmap_cache_key,
|
||||
_make_user_cache_key,
|
||||
_serialize_workload_summary,
|
||||
_deserialize_workload_summary,
|
||||
)
|
||||
from app.schemas.workload import UserWorkloadSummary, LoadLevel
|
||||
|
||||
# Test cache key generation
|
||||
week = date(2024, 1, 1)
|
||||
key = _make_heatmap_cache_key(week)
|
||||
assert "2024-01-01" in key
|
||||
assert key == "workload:heatmap:2024-01-01"
|
||||
|
||||
key_with_dept = _make_heatmap_cache_key(week, department_id="dept-1")
|
||||
assert "dept:dept-1" in key_with_dept
|
||||
|
||||
key_with_users = _make_heatmap_cache_key(week, user_ids=["user-1", "user-2"])
|
||||
assert "users:user-1,user-2" in key_with_users
|
||||
|
||||
user_key = _make_user_cache_key("user-123", week)
|
||||
assert user_key == "workload:user:user-123:2024-01-01"
|
||||
|
||||
def test_serialization_roundtrip(self):
|
||||
"""Test that serialization/deserialization preserves data."""
|
||||
from app.services.workload_cache import (
|
||||
_serialize_workload_summary,
|
||||
_deserialize_workload_summary,
|
||||
)
|
||||
from app.schemas.workload import UserWorkloadSummary, LoadLevel
|
||||
|
||||
original = UserWorkloadSummary(
|
||||
user_id="user-123",
|
||||
user_name="Test User",
|
||||
department_id="dept-1",
|
||||
department_name="R&D",
|
||||
capacity_hours=Decimal("40"),
|
||||
allocated_hours=Decimal("32.5"),
|
||||
load_percentage=Decimal("81.25"),
|
||||
load_level=LoadLevel.WARNING,
|
||||
task_count=5,
|
||||
)
|
||||
|
||||
serialized = _serialize_workload_summary(original)
|
||||
deserialized = _deserialize_workload_summary(serialized)
|
||||
|
||||
assert deserialized.user_id == original.user_id
|
||||
assert deserialized.user_name == original.user_name
|
||||
assert deserialized.capacity_hours == original.capacity_hours
|
||||
assert deserialized.allocated_hours == original.allocated_hours
|
||||
assert deserialized.load_percentage == original.load_percentage
|
||||
assert deserialized.load_level == original.load_level
|
||||
assert deserialized.task_count == original.task_count
|
||||
|
||||
def test_second_request_returns_same_data(self, client, db, admin_token, mock_redis):
|
||||
"""Second request should return identical data (testing idempotency)."""
|
||||
# First request
|
||||
response1 = client.get(
|
||||
"/api/workload/heatmap",
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
assert response1.status_code == 200
|
||||
result1 = response1.json()
|
||||
|
||||
# Second request - should return same data
|
||||
response2 = client.get(
|
||||
"/api/workload/heatmap",
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
assert response2.status_code == 200
|
||||
result2 = response2.json()
|
||||
|
||||
# Results should be identical (whether from cache or recalculated)
|
||||
assert result1 == result2
|
||||
|
||||
|
||||
class TestWorkloadE2EUserDetail:
|
||||
"""E2E tests for user workload detail endpoint."""
|
||||
|
||||
def setup_detail_test_env(self, db, mock_redis):
|
||||
"""Set up environment for detail testing."""
|
||||
import uuid
|
||||
unique = str(uuid.uuid4())[:8]
|
||||
|
||||
# Create user with unique ID
|
||||
user = User(
|
||||
id=f"user-detail-{unique}",
|
||||
email=f"detail-{unique}@test.com",
|
||||
name="Detail Test User",
|
||||
role_id="00000000-0000-0000-0000-000000000003",
|
||||
capacity=40,
|
||||
is_active=True,
|
||||
)
|
||||
db.add(user)
|
||||
|
||||
# Create space and project with unique IDs
|
||||
space = Space(
|
||||
id=f"space-detail-{unique}",
|
||||
name=f"Detail Space {unique}",
|
||||
owner_id="00000000-0000-0000-0000-000000000001",
|
||||
is_active=True,
|
||||
)
|
||||
db.add(space)
|
||||
|
||||
project = Project(
|
||||
id=f"project-detail-{unique}",
|
||||
space_id=f"space-detail-{unique}",
|
||||
title=f"Detail Project {unique}",
|
||||
owner_id="00000000-0000-0000-0000-000000000001",
|
||||
)
|
||||
db.add(project)
|
||||
|
||||
status = TaskStatus(
|
||||
id=f"status-detail-{unique}",
|
||||
project_id=f"project-detail-{unique}",
|
||||
name="In Progress",
|
||||
is_done=False,
|
||||
)
|
||||
db.add(status)
|
||||
|
||||
db.commit()
|
||||
|
||||
# Create tasks
|
||||
week_start = get_week_bounds(date.today())[0]
|
||||
due = datetime.combine(week_start, datetime.min.time()) + timedelta(days=2)
|
||||
|
||||
tasks = []
|
||||
for i in range(3):
|
||||
task = Task(
|
||||
id=f"task-detail-{i}-{unique}",
|
||||
project_id=f"project-detail-{unique}",
|
||||
title=f"Task {i}",
|
||||
assignee_id=f"user-detail-{unique}",
|
||||
status_id=f"status-detail-{unique}",
|
||||
original_estimate=Decimal("8"),
|
||||
due_date=due,
|
||||
created_by="00000000-0000-0000-0000-000000000001",
|
||||
)
|
||||
db.add(task)
|
||||
tasks.append(task)
|
||||
|
||||
db.commit()
|
||||
|
||||
return {
|
||||
"user": user,
|
||||
"tasks": tasks,
|
||||
"project": project,
|
||||
"unique": unique,
|
||||
"user_id": f"user-detail-{unique}",
|
||||
}
|
||||
|
||||
def test_detail_includes_task_list(self, client, db, admin_token, mock_redis):
|
||||
"""User detail should include list of tasks."""
|
||||
data = self.setup_detail_test_env(db, mock_redis)
|
||||
|
||||
response = client.get(
|
||||
f"/api/workload/user/{data['user_id']}",
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
result = response.json()
|
||||
|
||||
# Should include 3 tasks
|
||||
assert len(result["tasks"]) == 3
|
||||
|
||||
# Each task should have required fields
|
||||
for task in result["tasks"]:
|
||||
assert "task_id" in task
|
||||
assert "title" in task
|
||||
assert "project_name" in task
|
||||
assert "original_estimate" in task
|
||||
|
||||
def test_detail_calculates_total_correctly(self, client, db, admin_token, mock_redis):
|
||||
"""Total allocated hours should sum task estimates."""
|
||||
data = self.setup_detail_test_env(db, mock_redis)
|
||||
|
||||
response = client.get(
|
||||
f"/api/workload/user/{data['user_id']}",
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
|
||||
result = response.json()
|
||||
|
||||
# 3 tasks × 8 hours = 24 hours
|
||||
assert Decimal(result["allocated_hours"]) == Decimal("24")
|
||||
# 24/40 = 60%
|
||||
assert Decimal(result["load_percentage"]) == Decimal("60.00")
|
||||
assert result["load_level"] == "normal"
|
||||
|
||||
def test_nonexistent_user_returns_404(self, client, db, admin_token, mock_redis):
|
||||
"""Requesting nonexistent user should return 404."""
|
||||
response = client.get(
|
||||
"/api/workload/user/nonexistent-user",
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
|
||||
assert response.status_code == 404
|
||||
Reference in New Issue
Block a user