""" Service for managing task custom values. """ import uuid from typing import List, Dict, Any, Optional from decimal import Decimal, InvalidOperation from datetime import datetime from sqlalchemy.orm import Session from app.models import Task, CustomField, TaskCustomValue, User from app.schemas.task import CustomValueInput, CustomValueResponse from app.services.formula_service import FormulaService class CustomValueService: """Service for managing custom field values on tasks.""" @staticmethod def get_custom_values_for_task( db: Session, task: Task, include_formula_calculations: bool = True, ) -> List[CustomValueResponse]: """ Get all custom field values for a task. Args: db: Database session task: The task to get values for include_formula_calculations: Whether to calculate formula field values Returns: List of CustomValueResponse objects """ # Get all custom fields for the project fields = db.query(CustomField).filter( CustomField.project_id == task.project_id ).order_by(CustomField.position).all() if not fields: return [] # Get stored values stored_values = db.query(TaskCustomValue).filter( TaskCustomValue.task_id == task.id ).all() value_map = {v.field_id: v.value for v in stored_values} # Calculate formula values if requested formula_values = {} if include_formula_calculations: formula_values = FormulaService.calculate_all_formulas_for_task(db, task) result = [] for field in fields: if field.field_type == "formula": # Use calculated formula value calculated = formula_values.get(field.id) value = str(calculated) if calculated is not None else None display_value = CustomValueService._format_display_value( field, value, db ) else: # Use stored value value = value_map.get(field.id) display_value = CustomValueService._format_display_value( field, value, db ) result.append(CustomValueResponse( field_id=field.id, field_name=field.name, field_type=field.field_type, value=value, display_value=display_value, )) return result @staticmethod def _format_display_value( field: CustomField, value: Optional[str], db: Session, ) -> Optional[str]: """Format a value for display based on field type.""" if value is None: return None field_type = field.field_type if field_type == "person": # Look up user name from app.models import User user = db.query(User).filter(User.id == value).first() return user.name if user else value elif field_type == "number" or field_type == "formula": # Format number try: num = Decimal(value) # Remove trailing zeros after decimal point formatted = f"{num:,.4f}".rstrip('0').rstrip('.') return formatted except (InvalidOperation, ValueError): return value elif field_type == "date": # Format date try: dt = datetime.fromisoformat(value.replace('Z', '+00:00')) return dt.strftime('%Y-%m-%d') except (ValueError, AttributeError): return value else: return value @staticmethod def save_custom_values( db: Session, task: Task, custom_values: List[CustomValueInput], ) -> List[str]: """ Save custom field values for a task. Args: db: Database session task: The task to save values for custom_values: List of values to save Returns: List of field IDs that were updated (for formula recalculation) """ if not custom_values: return [] updated_field_ids = [] for cv in custom_values: field = db.query(CustomField).filter( CustomField.id == cv.field_id, CustomField.project_id == task.project_id, ).first() if not field: continue # Skip formula fields - they are calculated, not stored directly if field.field_type == "formula": continue # Validate value based on field type validated_value = CustomValueService._validate_value( field, cv.value, db ) # Find existing value or create new existing = db.query(TaskCustomValue).filter( TaskCustomValue.task_id == task.id, TaskCustomValue.field_id == cv.field_id, ).first() if existing: if existing.value != validated_value: existing.value = validated_value updated_field_ids.append(cv.field_id) else: new_value = TaskCustomValue( id=str(uuid.uuid4()), task_id=task.id, field_id=cv.field_id, value=validated_value, ) db.add(new_value) updated_field_ids.append(cv.field_id) # Recalculate formula fields if any values were updated if updated_field_ids: for field_id in updated_field_ids: FormulaService.recalculate_dependent_formulas(db, task, field_id) return updated_field_ids @staticmethod def _validate_value( field: CustomField, value: Any, db: Session, ) -> Optional[str]: """ Validate and normalize a value based on field type. Returns the validated value as a string, or None. """ if value is None or value == "": if field.is_required: raise ValueError(f"Field '{field.name}' is required") return None field_type = field.field_type str_value = str(value) if field_type == "text": return str_value elif field_type == "number": try: Decimal(str_value) return str_value except (InvalidOperation, ValueError): raise ValueError(f"Invalid number for field '{field.name}'") elif field_type == "dropdown": if field.options and str_value not in field.options: raise ValueError( f"Invalid option for field '{field.name}'. " f"Must be one of: {', '.join(field.options)}" ) return str_value elif field_type == "date": # Validate date format try: datetime.fromisoformat(str_value.replace('Z', '+00:00')) return str_value except ValueError: # Try parsing as date only try: datetime.strptime(str_value, '%Y-%m-%d') return str_value except ValueError: raise ValueError(f"Invalid date for field '{field.name}'") elif field_type == "person": # Validate user exists from app.models import User user = db.query(User).filter(User.id == str_value).first() if not user: raise ValueError(f"Invalid user ID for field '{field.name}'") return str_value return str_value @staticmethod def validate_required_fields( db: Session, project_id: str, custom_values: Optional[List[CustomValueInput]], ) -> List[str]: """ Validate that all required custom fields have values. Returns list of missing required field names. """ required_fields = db.query(CustomField).filter( CustomField.project_id == project_id, CustomField.is_required == True, CustomField.field_type != "formula", # Formula fields are calculated ).all() if not required_fields: return [] provided_field_ids = set() if custom_values: for cv in custom_values: if cv.value is not None and cv.value != "": provided_field_ids.add(cv.field_id) missing = [] for field in required_fields: if field.id not in provided_field_ids: missing.append(field.name) return missing