feat: implement dashboard widgets functionality

Backend:
- Add dashboard API router with widget endpoints
- Create dashboard schemas for widget data
- Add dashboard tests

Frontend:
- Enhance Dashboard page with widget components
- Add dashboard service for API calls
- Create reusable dashboard components

OpenSpec:
- Archive add-dashboard-widgets change
- Add dashboard capability specs

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
beabigegg
2026-01-08 22:52:28 +08:00
parent 3d678ba5b0
commit 4860704543
17 changed files with 2152 additions and 28 deletions

View File

@@ -0,0 +1,4 @@
"""Dashboard API module."""
from app.api.dashboard.router import router
__all__ = ["router"]

View File

@@ -0,0 +1,222 @@
"""Dashboard API endpoints.
Provides a single aggregated endpoint for dashboard data,
combining task statistics, workload summary, and project health.
"""
from datetime import datetime, timedelta
from decimal import Decimal
from fastapi import APIRouter, Depends
from sqlalchemy import func, and_, or_
from sqlalchemy.orm import Session
from app.core.database import get_db
from app.middleware.auth import get_current_user
from app.models import User, Task
from app.models.task_status import TaskStatus
from app.schemas.dashboard import (
DashboardResponse,
TaskStatistics,
WorkloadSummary,
HealthSummary,
)
from app.schemas.workload import LoadLevel
from app.services.workload_service import (
get_week_bounds,
get_current_week_start,
calculate_load_percentage,
determine_load_level,
)
from app.services.health_service import HealthService
router = APIRouter()
def get_task_statistics(db: Session, user_id: str) -> TaskStatistics:
"""
Calculate task statistics for a user.
Args:
db: Database session
user_id: User ID to calculate statistics for
Returns:
TaskStatistics with counts and completion rate
"""
now = datetime.utcnow()
week_start, week_end = get_week_bounds(now.date())
week_start_dt = datetime.combine(week_start, datetime.min.time())
week_end_dt = datetime.combine(week_end, datetime.max.time())
# Query all tasks assigned to user (not deleted)
base_query = db.query(Task).filter(
Task.assignee_id == user_id,
Task.is_deleted == False,
)
# Count total tasks (not done) assigned to user
# We need to join with status to check is_done
assigned_count = (
base_query
.outerjoin(TaskStatus, Task.status_id == TaskStatus.id)
.filter(
or_(
TaskStatus.is_done == False,
Task.status_id == None
)
)
.count()
)
# Count tasks due this week (not completed)
due_this_week = (
base_query
.outerjoin(TaskStatus, Task.status_id == TaskStatus.id)
.filter(
Task.due_date >= week_start_dt,
Task.due_date <= week_end_dt,
or_(
TaskStatus.is_done == False,
Task.status_id == None
)
)
.count()
)
# Count overdue tasks (past due_date, not completed)
overdue_count = (
base_query
.outerjoin(TaskStatus, Task.status_id == TaskStatus.id)
.filter(
Task.due_date < now,
or_(
TaskStatus.is_done == False,
Task.status_id == None
)
)
.count()
)
# Calculate completion rate
# Total tasks (including completed) assigned to user
total_tasks = base_query.count()
# Completed tasks
completed_tasks = (
base_query
.join(TaskStatus, Task.status_id == TaskStatus.id)
.filter(TaskStatus.is_done == True)
.count()
)
completion_rate = 0.0
if total_tasks > 0:
completion_rate = round((completed_tasks / total_tasks) * 100, 1)
return TaskStatistics(
assigned_count=assigned_count,
due_this_week=due_this_week,
overdue_count=overdue_count,
completion_rate=completion_rate,
)
def get_workload_summary(db: Session, user: User) -> WorkloadSummary:
"""
Get workload summary for a user for the current week.
Args:
db: Database session
user: User object
Returns:
WorkloadSummary with hours and load level
"""
now = datetime.utcnow()
week_start, week_end = get_week_bounds(now.date())
week_start_dt = datetime.combine(week_start, datetime.min.time())
week_end_dt = datetime.combine(week_end, datetime.max.time())
# Get tasks due this week for user (not completed)
tasks = (
db.query(Task)
.outerjoin(TaskStatus, Task.status_id == TaskStatus.id)
.filter(
Task.assignee_id == user.id,
Task.is_deleted == False,
Task.due_date >= week_start_dt,
Task.due_date <= week_end_dt,
or_(
TaskStatus.is_done == False,
Task.status_id == None
)
)
.all()
)
# Calculate allocated hours from original_estimate
allocated_hours = Decimal("0")
for task in tasks:
if task.original_estimate:
allocated_hours += task.original_estimate
capacity_hours = Decimal(str(user.capacity)) if user.capacity else Decimal("40")
load_percentage = calculate_load_percentage(allocated_hours, capacity_hours)
load_level = determine_load_level(load_percentage)
return WorkloadSummary(
allocated_hours=allocated_hours,
capacity_hours=capacity_hours,
load_percentage=load_percentage,
load_level=load_level,
)
def get_health_summary(db: Session) -> HealthSummary:
"""
Get aggregated project health summary.
Args:
db: Database session
Returns:
HealthSummary with project health breakdown
"""
health_service = HealthService(db)
dashboard = health_service.get_dashboard(status_filter="active")
return HealthSummary(
total_projects=dashboard.summary.total_projects,
healthy_count=dashboard.summary.healthy_count,
at_risk_count=dashboard.summary.at_risk_count,
critical_count=dashboard.summary.critical_count,
average_health_score=dashboard.summary.average_health_score,
)
@router.get("", response_model=DashboardResponse)
async def get_dashboard(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
Get aggregated dashboard data for the current user.
Returns a single response containing:
- **task_stats**: User's task statistics (assigned, due this week, overdue, completion rate)
- **workload**: Current week workload summary (hours, load level)
- **health_summary**: Aggregated project health metrics
This endpoint combines multiple data sources into a single response
to minimize frontend API calls and ensure data consistency.
"""
# Calculate all dashboard components
task_stats = get_task_statistics(db, current_user.id)
workload = get_workload_summary(db, current_user)
health_summary = get_health_summary(db)
return DashboardResponse(
task_stats=task_stats,
workload=workload,
health_summary=health_summary,
)

View File

@@ -37,6 +37,7 @@ from app.api.health import router as health_router
from app.api.custom_fields import router as custom_fields_router
from app.api.task_dependencies import router as task_dependencies_router
from app.api.admin import encryption_keys as admin_encryption_keys_router
from app.api.dashboard import router as dashboard_router
from app.core.config import settings
app = FastAPI(
@@ -82,6 +83,7 @@ app.include_router(health_router)
app.include_router(custom_fields_router)
app.include_router(task_dependencies_router)
app.include_router(admin_encryption_keys_router.router)
app.include_router(dashboard_router, prefix="/api/dashboard", tags=["Dashboard"])
@app.get("/health")

View File

@@ -0,0 +1,72 @@
"""Dashboard API schemas.
Defines response models for the dashboard endpoint that aggregates
task statistics, workload, and project health summaries.
"""
from pydantic import BaseModel
from decimal import Decimal
from typing import Optional
from app.schemas.workload import LoadLevel
class TaskStatistics(BaseModel):
"""User's task statistics for dashboard display.
Attributes:
assigned_count: Total tasks assigned to user (not completed)
due_this_week: Tasks with due_date in current week
overdue_count: Tasks past due_date, not completed
completion_rate: Percentage of completed tasks (0-100)
"""
assigned_count: int
due_this_week: int
overdue_count: int
completion_rate: float
class WorkloadSummary(BaseModel):
"""User's workload summary for dashboard display.
Attributes:
allocated_hours: Total estimated hours from tasks due this week
capacity_hours: User's weekly capacity
load_percentage: Percentage of capacity used
load_level: normal (<80%), warning (80-99%), overloaded (>=100%)
"""
allocated_hours: Decimal
capacity_hours: Decimal
load_percentage: Optional[Decimal] = None
load_level: LoadLevel
class HealthSummary(BaseModel):
"""Aggregated project health summary for dashboard display.
Attributes:
total_projects: Total number of active projects
healthy_count: Projects with health_score >= 80
at_risk_count: Projects with health_score 50-79
critical_count: Projects with health_score < 50
average_health_score: Average health score across all projects
"""
total_projects: int
healthy_count: int
at_risk_count: int
critical_count: int
average_health_score: float
class DashboardResponse(BaseModel):
"""Complete dashboard response aggregating all widgets.
Single endpoint response that combines:
- Task statistics for the current user
- Current week workload summary
- Project health summary
This minimizes frontend API calls and ensures data consistency.
"""
task_stats: TaskStatistics
workload: WorkloadSummary
health_summary: HealthSummary

View File

@@ -0,0 +1,635 @@
"""Tests for dashboard API and service functions."""
import pytest
from datetime import datetime, timedelta
from decimal import Decimal
from app.models import User, Department, Space, Project, Task
from app.models.task_status import TaskStatus
from app.api.dashboard.router import (
get_task_statistics,
get_workload_summary,
get_health_summary,
)
from app.schemas.workload import LoadLevel
class TestTaskStatistics:
"""Tests for task statistics calculation."""
def setup_test_data(self, db):
"""Set up test data for task statistics tests."""
# Create department
dept = Department(
id="dept-dash-001",
name="Dashboard Test Department",
)
db.add(dept)
# Create test user
user = User(
id="user-dash-001",
email="dashboard@test.com",
name="Dashboard Test User",
department_id="dept-dash-001",
role_id="00000000-0000-0000-0000-000000000003",
capacity=40,
is_active=True,
is_system_admin=False,
)
db.add(user)
# Create space
space = Space(
id="space-dash-001",
name="Dashboard Test Space",
owner_id="00000000-0000-0000-0000-000000000001",
is_active=True,
)
db.add(space)
# Create project
project = Project(
id="project-dash-001",
space_id="space-dash-001",
title="Dashboard Test Project",
owner_id="00000000-0000-0000-0000-000000000001",
department_id="dept-dash-001",
security_level="department",
status="active",
)
db.add(project)
# Create task statuses
status_todo = TaskStatus(
id="status-dash-todo",
project_id="project-dash-001",
name="To Do",
is_done=False,
)
db.add(status_todo)
status_done = TaskStatus(
id="status-dash-done",
project_id="project-dash-001",
name="Done",
is_done=True,
)
db.add(status_done)
db.commit()
return {
"department": dept,
"user": user,
"space": space,
"project": project,
"status_todo": status_todo,
"status_done": status_done,
}
def create_task(
self,
db,
data,
task_id,
done=False,
overdue=False,
due_this_week=False,
estimate=None,
):
"""Helper to create a task with optional characteristics."""
now = datetime.utcnow()
if overdue:
due_date = now - timedelta(days=3)
elif due_this_week:
# Due in the middle of current week
due_date = now + timedelta(days=2)
else:
# Due next week
due_date = now + timedelta(days=10)
task = Task(
id=task_id,
project_id=data["project"].id,
title=f"Task {task_id}",
assignee_id=data["user"].id,
status_id=data["status_done"].id if done else data["status_todo"].id,
original_estimate=estimate,
due_date=due_date,
created_by="00000000-0000-0000-0000-000000000001",
is_deleted=False,
)
db.add(task)
db.commit()
return task
def test_empty_statistics(self, db):
"""User with no tasks should have zero counts."""
data = self.setup_test_data(db)
stats = get_task_statistics(db, data["user"].id)
assert stats.assigned_count == 0
assert stats.due_this_week == 0
assert stats.overdue_count == 0
assert stats.completion_rate == 0.0
def test_assigned_count(self, db):
"""Should count non-completed tasks assigned to user."""
data = self.setup_test_data(db)
# Create 3 tasks: 2 active, 1 completed
self.create_task(db, data, "task-1", done=False)
self.create_task(db, data, "task-2", done=False)
self.create_task(db, data, "task-3", done=True)
stats = get_task_statistics(db, data["user"].id)
assert stats.assigned_count == 2 # Only non-completed
def test_due_this_week_count(self, db):
"""Should count tasks due this week."""
data = self.setup_test_data(db)
# Create tasks with different due dates
self.create_task(db, data, "task-1", due_this_week=True)
self.create_task(db, data, "task-2", due_this_week=True)
self.create_task(db, data, "task-3", due_this_week=False) # Next week
stats = get_task_statistics(db, data["user"].id)
assert stats.due_this_week == 2
def test_overdue_count(self, db):
"""Should count overdue tasks."""
data = self.setup_test_data(db)
# Create overdue and non-overdue tasks
self.create_task(db, data, "task-1", overdue=True)
self.create_task(db, data, "task-2", overdue=True)
self.create_task(db, data, "task-3", overdue=False)
stats = get_task_statistics(db, data["user"].id)
assert stats.overdue_count == 2
def test_overdue_completed_not_counted(self, db):
"""Completed overdue tasks should not be counted as overdue."""
data = self.setup_test_data(db)
# Create overdue task that is completed
self.create_task(db, data, "task-1", overdue=True, done=True)
self.create_task(db, data, "task-2", overdue=True, done=False)
stats = get_task_statistics(db, data["user"].id)
assert stats.overdue_count == 1
def test_completion_rate(self, db):
"""Should calculate correct completion rate."""
data = self.setup_test_data(db)
# Create 4 tasks: 1 completed, 3 active = 25%
self.create_task(db, data, "task-1", done=True)
self.create_task(db, data, "task-2", done=False)
self.create_task(db, data, "task-3", done=False)
self.create_task(db, data, "task-4", done=False)
stats = get_task_statistics(db, data["user"].id)
assert stats.completion_rate == 25.0
def test_deleted_tasks_excluded(self, db):
"""Soft-deleted tasks should not be counted."""
data = self.setup_test_data(db)
# Create normal task
self.create_task(db, data, "task-1")
# Create deleted task
deleted_task = Task(
id="task-deleted",
project_id=data["project"].id,
title="Deleted Task",
assignee_id=data["user"].id,
status_id=data["status_todo"].id,
due_date=datetime.utcnow() - timedelta(days=5), # Overdue
created_by="00000000-0000-0000-0000-000000000001",
is_deleted=True,
deleted_at=datetime.utcnow(),
)
db.add(deleted_task)
db.commit()
stats = get_task_statistics(db, data["user"].id)
assert stats.assigned_count == 1
assert stats.overdue_count == 0 # Deleted task not counted
class TestWorkloadSummary:
"""Tests for workload summary calculation."""
def setup_test_data(self, db):
"""Set up test data for workload summary tests."""
# Create department
dept = Department(
id="dept-wl-001",
name="Workload Test Department",
)
db.add(dept)
# Create test user with 40h capacity
user = User(
id="user-wl-001",
email="workload@test.com",
name="Workload Test User",
department_id="dept-wl-001",
role_id="00000000-0000-0000-0000-000000000003",
capacity=40,
is_active=True,
is_system_admin=False,
)
db.add(user)
# Create space
space = Space(
id="space-wl-001",
name="Workload Test Space",
owner_id="00000000-0000-0000-0000-000000000001",
is_active=True,
)
db.add(space)
# Create project
project = Project(
id="project-wl-001",
space_id="space-wl-001",
title="Workload Test Project",
owner_id="00000000-0000-0000-0000-000000000001",
department_id="dept-wl-001",
security_level="department",
status="active",
)
db.add(project)
# Create task status
status_todo = TaskStatus(
id="status-wl-todo",
project_id="project-wl-001",
name="To Do",
is_done=False,
)
db.add(status_todo)
status_done = TaskStatus(
id="status-wl-done",
project_id="project-wl-001",
name="Done",
is_done=True,
)
db.add(status_done)
db.commit()
return {
"department": dept,
"user": user,
"space": space,
"project": project,
"status_todo": status_todo,
"status_done": status_done,
}
def test_empty_workload(self, db):
"""User with no tasks should have zero allocated hours."""
data = self.setup_test_data(db)
workload = get_workload_summary(db, data["user"])
assert workload.allocated_hours == Decimal("0")
assert workload.capacity_hours == Decimal("40")
assert workload.load_percentage == Decimal("0.00")
assert workload.load_level == LoadLevel.NORMAL
def test_workload_with_tasks(self, db):
"""Should calculate correct allocated hours."""
data = self.setup_test_data(db)
# Create tasks due this week with estimates
now = datetime.utcnow()
due_date = now + timedelta(days=2)
task1 = Task(
id="task-wl-1",
project_id=data["project"].id,
title="Task 1",
assignee_id=data["user"].id,
status_id=data["status_todo"].id,
original_estimate=Decimal("16"),
due_date=due_date,
created_by="00000000-0000-0000-0000-000000000001",
is_deleted=False,
)
db.add(task1)
task2 = Task(
id="task-wl-2",
project_id=data["project"].id,
title="Task 2",
assignee_id=data["user"].id,
status_id=data["status_todo"].id,
original_estimate=Decimal("16"),
due_date=due_date,
created_by="00000000-0000-0000-0000-000000000001",
is_deleted=False,
)
db.add(task2)
db.commit()
workload = get_workload_summary(db, data["user"])
assert workload.allocated_hours == Decimal("32")
assert workload.load_percentage == Decimal("80.00")
assert workload.load_level == LoadLevel.WARNING
def test_workload_overloaded(self, db):
"""User with more than capacity should be overloaded."""
data = self.setup_test_data(db)
now = datetime.utcnow()
due_date = now + timedelta(days=2)
# Create task with 48h estimate (> 40h capacity)
task = Task(
id="task-wl-over",
project_id=data["project"].id,
title="Big Task",
assignee_id=data["user"].id,
status_id=data["status_todo"].id,
original_estimate=Decimal("48"),
due_date=due_date,
created_by="00000000-0000-0000-0000-000000000001",
is_deleted=False,
)
db.add(task)
db.commit()
workload = get_workload_summary(db, data["user"])
assert workload.allocated_hours == Decimal("48")
assert workload.load_percentage == Decimal("120.00")
assert workload.load_level == LoadLevel.OVERLOADED
def test_completed_tasks_excluded(self, db):
"""Completed tasks should not count toward workload."""
data = self.setup_test_data(db)
now = datetime.utcnow()
due_date = now + timedelta(days=2)
# Create completed task
task = Task(
id="task-wl-done",
project_id=data["project"].id,
title="Done Task",
assignee_id=data["user"].id,
status_id=data["status_done"].id,
original_estimate=Decimal("24"),
due_date=due_date,
created_by="00000000-0000-0000-0000-000000000001",
is_deleted=False,
)
db.add(task)
db.commit()
workload = get_workload_summary(db, data["user"])
assert workload.allocated_hours == Decimal("0")
class TestHealthSummary:
"""Tests for health summary aggregation."""
def setup_test_data(self, db):
"""Set up test data for health summary tests."""
# Create department
dept = Department(
id="dept-hs-001",
name="Health Summary Test Department",
)
db.add(dept)
# Create space
space = Space(
id="space-hs-001",
name="Health Summary Test Space",
owner_id="00000000-0000-0000-0000-000000000001",
is_active=True,
)
db.add(space)
# Create active project
project = Project(
id="project-hs-001",
space_id="space-hs-001",
title="Health Test Project",
owner_id="00000000-0000-0000-0000-000000000001",
department_id="dept-hs-001",
security_level="department",
status="active",
)
db.add(project)
db.commit()
return {
"department": dept,
"space": space,
"project": project,
}
def test_health_summary_structure(self, db):
"""Health summary should have correct structure."""
data = self.setup_test_data(db)
summary = get_health_summary(db)
assert summary.total_projects >= 1
assert summary.healthy_count >= 0
assert summary.at_risk_count >= 0
assert summary.critical_count >= 0
assert summary.average_health_score >= 0
assert summary.average_health_score <= 100
class TestDashboardAPI:
"""Tests for dashboard API endpoint."""
def setup_test_data(self, db):
"""Set up test data for dashboard API tests."""
# Create department
dept = Department(
id="dept-api-dash-001",
name="API Dashboard Test Department",
)
db.add(dept)
# Create space
space = Space(
id="space-api-dash-001",
name="API Dashboard Test Space",
owner_id="00000000-0000-0000-0000-000000000001",
is_active=True,
)
db.add(space)
# Create project
project = Project(
id="project-api-dash-001",
space_id="space-api-dash-001",
title="API Dashboard Test Project",
owner_id="00000000-0000-0000-0000-000000000001",
department_id="dept-api-dash-001",
security_level="department",
status="active",
)
db.add(project)
# Create task status
status_todo = TaskStatus(
id="status-api-dash-todo",
project_id="project-api-dash-001",
name="To Do",
is_done=False,
)
db.add(status_todo)
# Create a task for the admin user
now = datetime.utcnow()
task = Task(
id="task-api-dash-001",
project_id="project-api-dash-001",
title="Admin Task",
assignee_id="00000000-0000-0000-0000-000000000001",
status_id="status-api-dash-todo",
original_estimate=Decimal("8"),
due_date=now + timedelta(days=2),
created_by="00000000-0000-0000-0000-000000000001",
is_deleted=False,
)
db.add(task)
db.commit()
return {
"department": dept,
"space": space,
"project": project,
"task": task,
}
def test_get_dashboard(self, client, db, admin_token):
"""Should return complete dashboard data."""
data = self.setup_test_data(db)
response = client.get(
"/api/dashboard",
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == 200
result = response.json()
# Check structure
assert "task_stats" in result
assert "workload" in result
assert "health_summary" in result
def test_dashboard_task_stats_fields(self, client, db, admin_token):
"""Dashboard task_stats should include all expected fields."""
data = self.setup_test_data(db)
response = client.get(
"/api/dashboard",
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == 200
task_stats = response.json()["task_stats"]
assert "assigned_count" in task_stats
assert "due_this_week" in task_stats
assert "overdue_count" in task_stats
assert "completion_rate" in task_stats
def test_dashboard_workload_fields(self, client, db, admin_token):
"""Dashboard workload should include all expected fields."""
data = self.setup_test_data(db)
response = client.get(
"/api/dashboard",
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == 200
workload = response.json()["workload"]
assert "allocated_hours" in workload
assert "capacity_hours" in workload
assert "load_percentage" in workload
assert "load_level" in workload
def test_dashboard_health_summary_fields(self, client, db, admin_token):
"""Dashboard health_summary should include all expected fields."""
data = self.setup_test_data(db)
response = client.get(
"/api/dashboard",
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == 200
health_summary = response.json()["health_summary"]
assert "total_projects" in health_summary
assert "healthy_count" in health_summary
assert "at_risk_count" in health_summary
assert "critical_count" in health_summary
assert "average_health_score" in health_summary
def test_dashboard_unauthorized(self, client, db):
"""Unauthenticated requests should fail."""
response = client.get("/api/dashboard")
assert response.status_code == 403
def test_dashboard_with_user_tasks(self, client, db, admin_token):
"""Dashboard should reflect user's tasks correctly."""
data = self.setup_test_data(db)
response = client.get(
"/api/dashboard",
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == 200
result = response.json()
# Admin has 1 task assigned (created in setup)
assert result["task_stats"]["assigned_count"] >= 1
assert result["task_stats"]["due_this_week"] >= 1
def test_dashboard_workload_load_level_values(self, client, db, admin_token):
"""Workload load_level should be a valid enum value."""
data = self.setup_test_data(db)
response = client.get(
"/api/dashboard",
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == 200
load_level = response.json()["workload"]["load_level"]
assert load_level in ["normal", "warning", "overloaded", "unavailable"]