Files
PROJECT-CONTORL/backend/app/services/workload_cache.py
beabigegg 61fe01cb6b feat: implement workload heatmap module
- Backend (FastAPI):
  - Workload heatmap API with load level calculation
  - User workload detail endpoint with task breakdown
  - Redis caching for workload calculations (1hr TTL)
  - Department isolation and access control
  - WorkloadSnapshot model for historical data
  - Alembic migration for workload_snapshots table

- API Endpoints:
  - GET /api/workload/heatmap - Team workload overview
  - GET /api/workload/user/{id} - User workload detail
  - GET /api/workload/me - Current user workload

- Load Levels:
  - normal: <80%, warning: 80-99%, overloaded: >=100%

- Tests:
  - 26 unit/API tests
  - 15 E2E automated tests
  - 77 total tests passing

- OpenSpec:
  - add-resource-workload change archived
  - resource-management spec updated

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-29 01:13:21 +08:00

164 lines
4.7 KiB
Python

"""Workload cache service using Redis.
Provides caching for workload calculations to improve API response times.
"""
import json
from datetime import date
from decimal import Decimal
from typing import Optional, List
from app.core.redis import redis_client
from app.schemas.workload import UserWorkloadSummary, LoadLevel
# Cache TTL in seconds (1 hour)
WORKLOAD_CACHE_TTL = 3600
def _make_heatmap_cache_key(
week_start: date,
department_id: Optional[str] = None,
user_ids: Optional[List[str]] = None,
) -> str:
"""Generate cache key for heatmap query."""
parts = ["workload", "heatmap", str(week_start)]
if department_id:
parts.append(f"dept:{department_id}")
if user_ids:
parts.append(f"users:{','.join(sorted(user_ids))}")
return ":".join(parts)
def _make_user_cache_key(user_id: str, week_start: date) -> str:
"""Generate cache key for user workload."""
return f"workload:user:{user_id}:{week_start}"
def _serialize_workload_summary(summary: UserWorkloadSummary) -> dict:
"""Serialize UserWorkloadSummary for JSON storage."""
return {
"user_id": summary.user_id,
"user_name": summary.user_name,
"department_id": summary.department_id,
"department_name": summary.department_name,
"capacity_hours": str(summary.capacity_hours),
"allocated_hours": str(summary.allocated_hours),
"load_percentage": str(summary.load_percentage) if summary.load_percentage else None,
"load_level": summary.load_level.value,
"task_count": summary.task_count,
}
def _deserialize_workload_summary(data: dict) -> UserWorkloadSummary:
"""Deserialize UserWorkloadSummary from JSON."""
return UserWorkloadSummary(
user_id=data["user_id"],
user_name=data["user_name"],
department_id=data["department_id"],
department_name=data["department_name"],
capacity_hours=Decimal(data["capacity_hours"]),
allocated_hours=Decimal(data["allocated_hours"]),
load_percentage=Decimal(data["load_percentage"]) if data["load_percentage"] else None,
load_level=LoadLevel(data["load_level"]),
task_count=data["task_count"],
)
def get_cached_heatmap(
week_start: date,
department_id: Optional[str] = None,
user_ids: Optional[List[str]] = None,
) -> Optional[List[UserWorkloadSummary]]:
"""
Get cached heatmap data.
Args:
week_start: Start of week
department_id: Department filter
user_ids: User IDs filter
Returns:
List of UserWorkloadSummary or None if not cached
"""
cache_key = _make_heatmap_cache_key(week_start, department_id, user_ids)
cached = redis_client.get(cache_key)
if cached:
data = json.loads(cached)
return [_deserialize_workload_summary(item) for item in data]
return None
def set_cached_heatmap(
week_start: date,
summaries: List[UserWorkloadSummary],
department_id: Optional[str] = None,
user_ids: Optional[List[str]] = None,
) -> None:
"""
Cache heatmap data.
Args:
week_start: Start of week
summaries: List of workload summaries
department_id: Department filter
user_ids: User IDs filter
"""
cache_key = _make_heatmap_cache_key(week_start, department_id, user_ids)
data = [_serialize_workload_summary(s) for s in summaries]
redis_client.setex(cache_key, WORKLOAD_CACHE_TTL, json.dumps(data))
def get_cached_user_workload(
user_id: str,
week_start: date,
) -> Optional[UserWorkloadSummary]:
"""
Get cached user workload.
Args:
user_id: User ID
week_start: Start of week
Returns:
UserWorkloadSummary or None if not cached
"""
cache_key = _make_user_cache_key(user_id, week_start)
cached = redis_client.get(cache_key)
if cached:
data = json.loads(cached)
return _deserialize_workload_summary(data)
return None
def set_cached_user_workload(
user_id: str,
week_start: date,
summary: UserWorkloadSummary,
) -> None:
"""
Cache user workload.
Args:
user_id: User ID
week_start: Start of week
summary: Workload summary
"""
cache_key = _make_user_cache_key(user_id, week_start)
data = _serialize_workload_summary(summary)
redis_client.setex(cache_key, WORKLOAD_CACHE_TTL, json.dumps(data))
def invalidate_user_workload_cache(user_id: str) -> None:
"""
Invalidate all cached workload data for a user.
Note: This uses pattern matching which may be slow for large datasets.
For Phase 1, we rely on TTL expiration instead of active invalidation.
"""
pattern = f"workload:*:{user_id}:*"
for key in redis_client.scan_iter(match=pattern):
redis_client.delete(key)