from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from sqlalchemy.orm import Session from typing import Optional from app.core.database import get_db from app.core.security import decode_access_token from app.core.redis import get_redis from app.models.user import User security = HTTPBearer() async def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db), redis_client=Depends(get_redis), ) -> User: """ Dependency to get the current authenticated user. Validates the JWT token and checks session in Redis. """ token = credentials.credentials # Decode and verify token payload = decode_access_token(token) if payload is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired token", headers={"WWW-Authenticate": "Bearer"}, ) user_id = payload.get("sub") if user_id is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token payload", headers={"WWW-Authenticate": "Bearer"}, ) # Check session in Redis stored_token = redis_client.get(f"session:{user_id}") if stored_token is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Session expired or invalid", headers={"WWW-Authenticate": "Bearer"}, ) # Handle Redis bytes type - decode if necessary if isinstance(stored_token, bytes): stored_token = stored_token.decode("utf-8") if stored_token != token: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Session expired or invalid", headers={"WWW-Authenticate": "Bearer"}, ) # Get user from database user = db.query(User).filter(User.id == user_id).first() if user is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found", headers={"WWW-Authenticate": "Bearer"}, ) if not user.is_active: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="User account is disabled", ) return user async def get_current_active_user( current_user: User = Depends(get_current_user), ) -> User: """ Dependency to ensure user is active. """ if not current_user.is_active: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Inactive user", ) return current_user def require_system_admin( current_user: User = Depends(get_current_user), ) -> User: """ Dependency to require system admin privileges. """ if not current_user.is_system_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="System admin privileges required", ) return current_user def require_permission(permission: str): """ Decorator factory to require specific permission. Usage: @router.get("/protected") async def protected_route(user: User = Depends(require_permission("users.read"))): ... """ def permission_checker(current_user: User = Depends(get_current_user)) -> User: # System admin has all permissions if current_user.is_system_admin: return current_user # Check role permissions if current_user.role is None: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="No role assigned", ) permissions = current_user.role.permissions or {} # Check for "all" permission if permissions.get("all"): return current_user # Check specific permission if not permissions.get(permission): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Permission '{permission}' required", ) return current_user return permission_checker def check_department_access( user: User, resource_department_id: Optional[str], resource_security_level: str = "department", ) -> bool: """ Check if user has access to a resource based on department isolation. Args: user: The current user resource_department_id: Department ID of the resource resource_security_level: Security level ('public', 'department', 'confidential') Returns: bool: True if user has access, False otherwise """ # System admin bypasses department isolation if user.is_system_admin: return True # Public resources are accessible to all if resource_security_level == "public": return True # No department specified on resource means accessible to all if resource_department_id is None: return True # User must be in the same department if user.department_id == resource_department_id: return True return False def check_space_access(user: User, space) -> bool: """ Check if user has access to a space. Currently all active users can see all spaces. Owner has edit/delete permissions. """ # System admin has full access if user.is_system_admin: return True # All active users can view spaces return True def check_space_edit_access(user: User, space) -> bool: """ Check if user can edit/delete a space. """ # System admin has full access if user.is_system_admin: return True # Only owner can edit return space.owner_id == user.id def check_project_access(user: User, project) -> bool: """ Check if user has access to a project based on security level and membership. Access is granted if any of the following conditions are met: 1. User is a system admin 2. User is the project owner 3. User is an explicit project member (cross-department collaboration) 4. Security level allows access: - public: All logged-in users - department: Same department users - confidential: Only owner/members/admin """ # System admin bypasses all restrictions if user.is_system_admin: return True # Project owner always has access if project.owner_id == user.id: return True # Check if user is an explicit project member (for cross-department collaboration) # This allows users from other departments to access the project if hasattr(project, 'members') and project.members: for member in project.members: if member.user_id == user.id: return True # Check by security level security_level = project.security_level if security_level == "public": return True elif security_level == "department": # Same department has access if project.department_id and user.department_id == project.department_id: return True return False else: # confidential # Only owner/members have access (already checked above) return False def check_project_edit_access(user: User, project) -> bool: """ Check if user can edit/delete a project. Edit access is granted if: 1. User is a system admin 2. User is the project owner 3. User is a project member with 'admin' role """ # System admin has full access if user.is_system_admin: return True # Owner can edit if project.owner_id == user.id: return True # Project member with admin role can edit if hasattr(project, 'members') and project.members: for member in project.members: if member.user_id == user.id and member.role == "admin": return True return False def check_task_access(user: User, task, project) -> bool: """ Check if user has access to a task. Task access is based on project access. """ return check_project_access(user, project) def check_task_edit_access(user: User, task, project) -> bool: """ Check if user can edit a task. """ # System admin has full access if user.is_system_admin: return True # Project owner can edit all tasks if project.owner_id == user.id: return True # Task creator can edit their own tasks if task.created_by == user.id: return True # Assignee can edit their assigned tasks if task.assignee_id == user.id: return True return False