"""Room service for managing incident rooms Handles business logic for room CRUD operations """ from sqlalchemy.orm import Session from sqlalchemy import or_, and_, func from typing import List, Optional, Dict from datetime import datetime import uuid from app.modules.chat_room.models import IncidentRoom, RoomMember, RoomStatus, MemberRole from app.modules.chat_room.schemas import CreateRoomRequest, UpdateRoomRequest, RoomFilterParams class RoomService: """Service for room management operations""" def create_room( self, db: Session, user_id: str, room_data: CreateRoomRequest ) -> IncidentRoom: """Create a new incident room Args: db: Database session user_id: ID of user creating the room room_data: Room creation data Returns: Created room instance """ # Create room room = IncidentRoom( room_id=str(uuid.uuid4()), title=room_data.title, incident_type=room_data.incident_type, severity=room_data.severity, location=room_data.location, description=room_data.description, status=RoomStatus.ACTIVE, created_by=user_id, created_at=datetime.utcnow(), last_activity_at=datetime.utcnow(), last_updated_at=datetime.utcnow(), member_count=1 ) db.add(room) # Add creator as owner owner = RoomMember( room_id=room.room_id, user_id=user_id, role=MemberRole.OWNER, added_by=user_id, added_at=datetime.utcnow() ) db.add(owner) db.commit() db.refresh(room) return room def get_room( self, db: Session, room_id: str, user_id: str, is_admin: bool = False ) -> Optional[IncidentRoom]: """Get room details Args: db: Database session room_id: Room ID user_id: User requesting access is_admin: Whether user is system admin Returns: Room instance if user has access, None otherwise """ room = db.query(IncidentRoom).filter( IncidentRoom.room_id == room_id ).first() if not room: return None # Check access: admin or member if not is_admin: member = db.query(RoomMember).filter( and_( RoomMember.room_id == room_id, RoomMember.user_id == user_id, RoomMember.removed_at.is_(None) ) ).first() if not member: return None return room def list_user_rooms( self, db: Session, user_id: str, filters: RoomFilterParams, is_admin: bool = False ) -> List[IncidentRoom]: """List rooms accessible to user with filters Args: db: Database session user_id: User ID filters: Filter parameters is_admin: Whether user is system admin Returns: List of accessible rooms """ query = db.query(IncidentRoom) # Access control: admin sees all, others see only their rooms if not is_admin or not filters.all: # Join with room_members to filter by membership query = query.join(RoomMember).filter( and_( RoomMember.user_id == user_id, RoomMember.removed_at.is_(None) ) ) # Apply filters if filters.status: query = query.filter(IncidentRoom.status == filters.status) if filters.incident_type: query = query.filter(IncidentRoom.incident_type == filters.incident_type) if filters.severity: query = query.filter(IncidentRoom.severity == filters.severity) if filters.created_after: query = query.filter(IncidentRoom.created_at >= filters.created_after) if filters.created_before: query = query.filter(IncidentRoom.created_at <= filters.created_before) if filters.search: search_term = f"%{filters.search}%" query = query.filter( or_( IncidentRoom.title.ilike(search_term), IncidentRoom.description.ilike(search_term) ) ) # Order by last activity (most recent first) query = query.order_by(IncidentRoom.last_activity_at.desc()) # Apply pagination total = query.count() rooms = query.offset(filters.offset).limit(filters.limit).all() return rooms, total def update_room( self, db: Session, room_id: str, updates: UpdateRoomRequest ) -> Optional[IncidentRoom]: """Update room metadata Args: db: Database session room_id: Room ID updates: Update data Returns: Updated room or None if not found """ room = db.query(IncidentRoom).filter( IncidentRoom.room_id == room_id ).first() if not room: return None # Apply updates if updates.title is not None: room.title = updates.title if updates.severity is not None: room.severity = updates.severity if updates.location is not None: room.location = updates.location if updates.description is not None: room.description = updates.description if updates.resolution_notes is not None: room.resolution_notes = updates.resolution_notes # Handle status transitions if updates.status is not None: if not self._validate_status_transition(room.status, updates.status): raise ValueError(f"Invalid status transition from {room.status} to {updates.status}") room.status = updates.status # Update timestamps based on status if updates.status == RoomStatus.RESOLVED: room.resolved_at = datetime.utcnow() elif updates.status == RoomStatus.ARCHIVED: room.archived_at = datetime.utcnow() # Update activity timestamps room.last_updated_at = datetime.utcnow() room.last_activity_at = datetime.utcnow() db.commit() db.refresh(room) return room def change_room_status( self, db: Session, room_id: str, new_status: RoomStatus ) -> Optional[IncidentRoom]: """Change room status with validation Args: db: Database session room_id: Room ID new_status: New status Returns: Updated room or None """ room = db.query(IncidentRoom).filter( IncidentRoom.room_id == room_id ).first() if not room: return None if not self._validate_status_transition(room.status, new_status): raise ValueError(f"Invalid status transition from {room.status} to {new_status}") room.status = new_status # Update timestamps if new_status == RoomStatus.RESOLVED: room.resolved_at = datetime.utcnow() elif new_status == RoomStatus.ARCHIVED: room.archived_at = datetime.utcnow() room.last_updated_at = datetime.utcnow() room.last_activity_at = datetime.utcnow() db.commit() db.refresh(room) return room def search_rooms( self, db: Session, user_id: str, search_term: str, is_admin: bool = False ) -> List[IncidentRoom]: """Search rooms by title or description Args: db: Database session user_id: User ID search_term: Search string is_admin: Whether user is system admin Returns: List of matching rooms """ query = db.query(IncidentRoom) # Access control if not is_admin: query = query.join(RoomMember).filter( and_( RoomMember.user_id == user_id, RoomMember.removed_at.is_(None) ) ) # Search filter search_pattern = f"%{search_term}%" query = query.filter( or_( IncidentRoom.title.ilike(search_pattern), IncidentRoom.description.ilike(search_pattern) ) ) return query.order_by(IncidentRoom.last_activity_at.desc()).all() def delete_room( self, db: Session, room_id: str ) -> bool: """Soft delete a room (archive it) Args: db: Database session room_id: Room ID Returns: True if deleted, False if not found """ room = db.query(IncidentRoom).filter( IncidentRoom.room_id == room_id ).first() if not room: return False room.status = RoomStatus.ARCHIVED room.archived_at = datetime.utcnow() room.last_updated_at = datetime.utcnow() db.commit() return True def _validate_status_transition( self, current_status: RoomStatus, new_status: RoomStatus ) -> bool: """Validate status transition Valid transitions: - active -> resolved - resolved -> archived - active -> archived (allowed but not recommended) Args: current_status: Current status new_status: New status Returns: True if valid, False otherwise """ valid_transitions = { RoomStatus.ACTIVE: [RoomStatus.RESOLVED, RoomStatus.ARCHIVED], RoomStatus.RESOLVED: [RoomStatus.ARCHIVED], RoomStatus.ARCHIVED: [] # No transitions from archived } return new_status in valid_transitions.get(current_status, []) def update_room_activity( self, db: Session, room_id: str ) -> None: """Update room's last activity timestamp Args: db: Database session room_id: Room ID """ room = db.query(IncidentRoom).filter( IncidentRoom.room_id == room_id ).first() if room: room.last_activity_at = datetime.utcnow() db.commit() # Create singleton instance room_service = RoomService()