Files
2026-01-11 08:37:21 +08:00

223 lines
6.4 KiB
Python

"""Dashboard API endpoints.
Provides a single aggregated endpoint for dashboard data,
combining task statistics, workload summary, and project health.
"""
from datetime import datetime, timedelta
from decimal import Decimal
from fastapi import APIRouter, Depends
from sqlalchemy import func, and_, or_
from sqlalchemy.orm import Session
from app.core.database import get_db
from app.middleware.auth import get_current_user
from app.models import User, Task
from app.models.task_status import TaskStatus
from app.schemas.dashboard import (
DashboardResponse,
TaskStatistics,
WorkloadSummary,
HealthSummary,
)
from app.schemas.workload import LoadLevel
from app.services.workload_service import (
get_week_bounds,
get_current_week_start,
calculate_load_percentage,
determine_load_level,
)
from app.services.health_service import HealthService
router = APIRouter()
def get_task_statistics(db: Session, user_id: str) -> TaskStatistics:
"""
Calculate task statistics for a user.
Args:
db: Database session
user_id: User ID to calculate statistics for
Returns:
TaskStatistics with counts and completion rate
"""
now = datetime.utcnow()
week_start, week_end = get_week_bounds(now.date())
week_start_dt = datetime.combine(week_start, datetime.min.time())
week_end_dt = datetime.combine(week_end, datetime.max.time())
# Query all tasks assigned to user (not deleted)
base_query = db.query(Task).filter(
Task.assignee_id == user_id,
Task.is_deleted == False,
)
# Count total tasks (not done) assigned to user
# We need to join with status to check is_done
assigned_count = (
base_query
.outerjoin(TaskStatus, Task.status_id == TaskStatus.id)
.filter(
or_(
TaskStatus.is_done == False,
Task.status_id == None
)
)
.count()
)
# Count tasks due this week (not completed)
due_this_week = (
base_query
.outerjoin(TaskStatus, Task.status_id == TaskStatus.id)
.filter(
Task.due_date >= week_start_dt,
Task.due_date <= week_end_dt,
or_(
TaskStatus.is_done == False,
Task.status_id == None
)
)
.count()
)
# Count overdue tasks (past due_date, not completed)
overdue_count = (
base_query
.outerjoin(TaskStatus, Task.status_id == TaskStatus.id)
.filter(
Task.due_date < now,
or_(
TaskStatus.is_done == False,
Task.status_id == None
)
)
.count()
)
# Calculate completion rate
# Total tasks (including completed) assigned to user
total_tasks = base_query.count()
# Completed tasks
completed_tasks = (
base_query
.join(TaskStatus, Task.status_id == TaskStatus.id)
.filter(TaskStatus.is_done == True)
.count()
)
completion_rate = 0.0
if total_tasks > 0:
completion_rate = round((completed_tasks / total_tasks) * 100, 1)
return TaskStatistics(
assigned_count=assigned_count,
due_this_week=due_this_week,
overdue_count=overdue_count,
completion_rate=completion_rate,
)
def get_workload_summary(db: Session, user: User) -> WorkloadSummary:
"""
Get workload summary for a user for the current week.
Args:
db: Database session
user: User object
Returns:
WorkloadSummary with hours and load level
"""
now = datetime.utcnow()
week_start, week_end = get_week_bounds(now.date())
week_start_dt = datetime.combine(week_start, datetime.min.time())
week_end_dt = datetime.combine(week_end, datetime.max.time())
# Get tasks due this week for user (not completed)
tasks = (
db.query(Task)
.outerjoin(TaskStatus, Task.status_id == TaskStatus.id)
.filter(
Task.assignee_id == user.id,
Task.is_deleted == False,
Task.due_date >= week_start_dt,
Task.due_date <= week_end_dt,
or_(
TaskStatus.is_done == False,
Task.status_id == None
)
)
.all()
)
# Calculate allocated hours from original_estimate
allocated_hours = Decimal("0")
for task in tasks:
if task.original_estimate:
allocated_hours += task.original_estimate
capacity_hours = Decimal(str(user.capacity)) if user.capacity is not None else Decimal("40")
load_percentage = calculate_load_percentage(allocated_hours, capacity_hours)
load_level = determine_load_level(load_percentage)
return WorkloadSummary(
allocated_hours=allocated_hours,
capacity_hours=capacity_hours,
load_percentage=load_percentage,
load_level=load_level,
)
def get_health_summary(db: Session) -> HealthSummary:
"""
Get aggregated project health summary.
Args:
db: Database session
Returns:
HealthSummary with project health breakdown
"""
health_service = HealthService(db)
dashboard = health_service.get_dashboard(status_filter="active")
return HealthSummary(
total_projects=dashboard.summary.total_projects,
healthy_count=dashboard.summary.healthy_count,
at_risk_count=dashboard.summary.at_risk_count,
critical_count=dashboard.summary.critical_count,
average_health_score=dashboard.summary.average_health_score,
)
@router.get("", response_model=DashboardResponse)
async def get_dashboard(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
Get aggregated dashboard data for the current user.
Returns a single response containing:
- **task_stats**: User's task statistics (assigned, due this week, overdue, completion rate)
- **workload**: Current week workload summary (hours, load level)
- **health_summary**: Aggregated project health metrics
This endpoint combines multiple data sources into a single response
to minimize frontend API calls and ensure data consistency.
"""
# Calculate all dashboard components
task_stats = get_task_statistics(db, current_user.id)
workload = get_workload_summary(db, current_user)
health_summary = get_health_summary(db)
return DashboardResponse(
task_stats=task_stats,
workload=workload,
health_summary=health_summary,
)