from fastapi import APIRouter, Depends, HTTPException, status, Query, Request from sqlalchemy.orm import Session from sqlalchemy import or_ from typing import List from app.core.database import get_db from app.models.user import User from app.models.role import Role from app.models import AuditAction from app.schemas.user import UserResponse, UserUpdate from app.middleware.auth import ( get_current_user, require_permission, require_system_admin, check_department_access, ) from app.middleware.audit import get_audit_metadata from app.services.audit_service import AuditService router = APIRouter() @router.get("/search", response_model=List[UserResponse]) async def search_users( q: str = Query(..., min_length=1, max_length=100, description="Search query"), limit: int = Query(10, ge=1, le=50, description="Max results"), db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """ Search users by name or email. Used for @mention autocomplete. Returns users matching the query, limited to same department unless system admin. """ query = db.query(User).filter( User.is_active == True, or_( User.name.ilike(f"%{q}%"), User.email.ilike(f"%{q}%"), ) ) # Filter by department if not system admin if not current_user.is_system_admin and current_user.department_id: query = query.filter(User.department_id == current_user.department_id) users = query.limit(limit).all() return users @router.get("", response_model=List[UserResponse]) async def list_users( skip: int = 0, limit: int = 100, db: Session = Depends(get_db), current_user: User = Depends(require_permission("users.read")), ): """ List all users. Filtered by department if not system admin. """ query = db.query(User) # Filter by department if not system admin if not current_user.is_system_admin and current_user.department_id: query = query.filter(User.department_id == current_user.department_id) users = query.offset(skip).limit(limit).all() return users @router.get("/{user_id}", response_model=UserResponse) async def get_user( user_id: str, db: Session = Depends(get_db), current_user: User = Depends(require_permission("users.read")), ): """ Get a specific user by ID. """ user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found", ) # Check department access if not check_department_access(current_user, user.department_id): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Access denied to this user", ) return user @router.patch("/{user_id}", response_model=UserResponse) async def update_user( user_id: str, user_update: UserUpdate, db: Session = Depends(get_db), current_user: User = Depends(require_permission("users.write")), ): """ Update user information. """ user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found", ) # Check department access if not check_department_access(current_user, user.department_id): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Access denied to this user", ) # Prevent modification of system admin properties by non-system-admins if user.is_system_admin and not current_user.is_system_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Cannot modify system administrator", ) # Update fields update_data = user_update.model_dump(exclude_unset=True) for field, value in update_data.items(): setattr(user, field, value) db.commit() db.refresh(user) return user @router.patch("/{user_id}/role", response_model=UserResponse) async def assign_role( user_id: str, role_id: str, request: Request, db: Session = Depends(get_db), current_user: User = Depends(require_system_admin), ): """ Assign a role to a user. Requires system admin. """ user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found", ) # Prevent modification of system admin if user.is_system_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Cannot modify system administrator role", ) # Verify role exists role = db.query(Role).filter(Role.id == role_id).first() if not role: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Role not found", ) # Prevent assigning system role to non-system-admin if role.is_system_role: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Cannot assign system role", ) old_role_id = user.role_id user.role_id = role_id # Audit log for role change (high sensitivity) if old_role_id != role_id: AuditService.log_event( db=db, event_type="user.role_change", resource_type="user", action=AuditAction.UPDATE, user_id=current_user.id, resource_id=user.id, changes=[{"field": "role_id", "old_value": old_role_id, "new_value": role_id}], request_metadata=get_audit_metadata(request), ) db.commit() db.refresh(user) return user @router.patch("/{user_id}/admin", response_model=UserResponse) async def set_admin_status( user_id: str, is_admin: bool, request: Request, db: Session = Depends(get_db), current_user: User = Depends(require_system_admin), ): """ Set or revoke system administrator status. Requires system admin. """ user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found", ) # Prevent self-modification if user.id == current_user.id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Cannot modify your own admin status", ) old_admin_status = user.is_system_admin user.is_system_admin = is_admin # Audit log for admin change (critical sensitivity, triggers alert) if old_admin_status != is_admin: AuditService.log_event( db=db, event_type="user.admin_change", resource_type="user", action=AuditAction.UPDATE, user_id=current_user.id, resource_id=user.id, changes=[{"field": "is_system_admin", "old_value": old_admin_status, "new_value": is_admin}], request_metadata=get_audit_metadata(request), ) db.commit() db.refresh(user) return user