import uuid from typing import List, Optional from fastapi import APIRouter, Depends, HTTPException, status, Query from sqlalchemy.orm import Session from app.core.database import get_db from app.models import User, Project, Task, TaskStatus from app.schemas.task import ( TaskCreate, TaskUpdate, TaskResponse, TaskWithDetails, TaskListResponse, TaskStatusUpdate, TaskAssignUpdate ) from app.middleware.auth import ( get_current_user, check_project_access, check_task_access, check_task_edit_access ) router = APIRouter(tags=["tasks"]) # Maximum subtask depth MAX_SUBTASK_DEPTH = 2 def get_task_depth(db: Session, task: Task) -> int: """Calculate the depth of a task in the hierarchy.""" depth = 1 current = task while current.parent_task_id: depth += 1 current = db.query(Task).filter(Task.id == current.parent_task_id).first() if not current: break return depth def task_to_response(task: Task) -> TaskWithDetails: """Convert a Task model to TaskWithDetails response.""" return TaskWithDetails( id=task.id, project_id=task.project_id, parent_task_id=task.parent_task_id, title=task.title, description=task.description, priority=task.priority, original_estimate=task.original_estimate, time_spent=task.time_spent, due_date=task.due_date, assignee_id=task.assignee_id, status_id=task.status_id, blocker_flag=task.blocker_flag, position=task.position, created_by=task.created_by, created_at=task.created_at, updated_at=task.updated_at, assignee_name=task.assignee.name if task.assignee else None, status_name=task.status.name if task.status else None, status_color=task.status.color if task.status else None, creator_name=task.creator.name if task.creator else None, subtask_count=len(task.subtasks) if task.subtasks else 0, ) @router.get("/api/projects/{project_id}/tasks", response_model=TaskListResponse) async def list_tasks( project_id: str, parent_task_id: Optional[str] = Query(None, description="Filter by parent task"), status_id: Optional[str] = Query(None, description="Filter by status"), assignee_id: Optional[str] = Query(None, description="Filter by assignee"), db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """ List all tasks in a project. """ project = db.query(Project).filter(Project.id == project_id).first() if not project: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Project not found", ) if not check_project_access(current_user, project): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Access denied", ) query = db.query(Task).filter(Task.project_id == project_id) # Apply filters if parent_task_id is not None: if parent_task_id == "": # Root tasks only query = query.filter(Task.parent_task_id == None) else: query = query.filter(Task.parent_task_id == parent_task_id) else: # By default, show only root tasks query = query.filter(Task.parent_task_id == None) if status_id: query = query.filter(Task.status_id == status_id) if assignee_id: query = query.filter(Task.assignee_id == assignee_id) tasks = query.order_by(Task.position, Task.created_at).all() return TaskListResponse( tasks=[task_to_response(t) for t in tasks], total=len(tasks), ) @router.post("/api/projects/{project_id}/tasks", response_model=TaskResponse, status_code=status.HTTP_201_CREATED) async def create_task( project_id: str, task_data: TaskCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """ Create a new task in a project. """ project = db.query(Project).filter(Project.id == project_id).first() if not project: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Project not found", ) if not check_project_access(current_user, project): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Access denied", ) # Validate parent task and check depth if task_data.parent_task_id: parent_task = db.query(Task).filter( Task.id == task_data.parent_task_id, Task.project_id == project_id ).first() if not parent_task: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Parent task not found in this project", ) # Check depth limit parent_depth = get_task_depth(db, parent_task) if parent_depth >= MAX_SUBTASK_DEPTH: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Maximum subtask depth ({MAX_SUBTASK_DEPTH}) exceeded", ) # Validate status_id belongs to this project if task_data.status_id: status_obj = db.query(TaskStatus).filter( TaskStatus.id == task_data.status_id, TaskStatus.project_id == project_id ).first() if not status_obj: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Status not found in this project", ) else: # Use first status (To Do) as default default_status = db.query(TaskStatus).filter( TaskStatus.project_id == project_id ).order_by(TaskStatus.position).first() task_data.status_id = default_status.id if default_status else None # Get max position max_pos_result = db.query(Task).filter( Task.project_id == project_id, Task.parent_task_id == task_data.parent_task_id ).order_by(Task.position.desc()).first() next_position = (max_pos_result.position + 1) if max_pos_result else 0 task = Task( id=str(uuid.uuid4()), project_id=project_id, parent_task_id=task_data.parent_task_id, title=task_data.title, description=task_data.description, priority=task_data.priority.value if task_data.priority else "medium", original_estimate=task_data.original_estimate, due_date=task_data.due_date, assignee_id=task_data.assignee_id, status_id=task_data.status_id, position=next_position, created_by=current_user.id, ) db.add(task) db.commit() db.refresh(task) return task @router.get("/api/tasks/{task_id}", response_model=TaskWithDetails) async def get_task( task_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """ Get a task by ID. """ task = db.query(Task).filter(Task.id == task_id).first() if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found", ) if not check_task_access(current_user, task, task.project): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Access denied", ) return task_to_response(task) @router.patch("/api/tasks/{task_id}", response_model=TaskResponse) async def update_task( task_id: str, task_data: TaskUpdate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """ Update a task. """ task = db.query(Task).filter(Task.id == task_id).first() if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found", ) if not check_task_edit_access(current_user, task, task.project): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Permission denied", ) # Update fields update_data = task_data.model_dump(exclude_unset=True) for field, value in update_data.items(): if field == "priority" and value: setattr(task, field, value.value) else: setattr(task, field, value) db.commit() db.refresh(task) return task @router.delete("/api/tasks/{task_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_task( task_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """ Delete a task (cascades to subtasks). """ task = db.query(Task).filter(Task.id == task_id).first() if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found", ) if not check_task_edit_access(current_user, task, task.project): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Permission denied", ) db.delete(task) db.commit() return None @router.patch("/api/tasks/{task_id}/status", response_model=TaskResponse) async def update_task_status( task_id: str, status_data: TaskStatusUpdate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """ Update task status. """ task = db.query(Task).filter(Task.id == task_id).first() if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found", ) if not check_task_edit_access(current_user, task, task.project): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Permission denied", ) # Validate new status belongs to same project new_status = db.query(TaskStatus).filter( TaskStatus.id == status_data.status_id, TaskStatus.project_id == task.project_id ).first() if not new_status: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Status not found in this project", ) task.status_id = status_data.status_id # Auto-set blocker_flag based on status name if new_status.name.lower() == "blocked": task.blocker_flag = True else: task.blocker_flag = False db.commit() db.refresh(task) return task @router.patch("/api/tasks/{task_id}/assign", response_model=TaskResponse) async def assign_task( task_id: str, assign_data: TaskAssignUpdate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """ Assign or unassign a task. """ task = db.query(Task).filter(Task.id == task_id).first() if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found", ) if not check_task_edit_access(current_user, task, task.project): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Permission denied", ) # Validate assignee exists if provided if assign_data.assignee_id: assignee = db.query(User).filter(User.id == assign_data.assignee_id).first() if not assignee: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Assignee not found", ) task.assignee_id = assign_data.assignee_id db.commit() db.refresh(task) return task @router.get("/api/tasks/{task_id}/subtasks", response_model=TaskListResponse) async def list_subtasks( task_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """ List subtasks of a task. """ task = db.query(Task).filter(Task.id == task_id).first() if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found", ) if not check_task_access(current_user, task, task.project): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Access denied", ) subtasks = db.query(Task).filter( Task.parent_task_id == task_id ).order_by(Task.position, Task.created_at).all() return TaskListResponse( tasks=[task_to_response(t) for t in subtasks], total=len(subtasks), )