feat: complete LOW priority code quality improvements

Backend:
- LOW-002: Add Query validation with max page size limits (100)
- LOW-003: Replace magic strings with TaskStatus.is_done flag
- LOW-004: Add 'creation' trigger type validation
- Add action_executor.py with UpdateFieldAction and AutoAssignAction

Frontend:
- LOW-005: Replace TypeScript 'any' with 'unknown' + type guards
- LOW-006: Add ConfirmModal component with A11Y support
- LOW-007: Add ToastContext for user feedback notifications
- LOW-009: Add Skeleton components (17 loading states replaced)
- LOW-010: Setup Vitest with 21 tests for ConfirmModal and Skeleton

Components updated:
- App.tsx, ProtectedRoute.tsx, Spaces.tsx, Projects.tsx, Tasks.tsx
- ProjectSettings.tsx, AuditPage.tsx, WorkloadPage.tsx, ProjectHealthPage.tsx
- Comments.tsx, AttachmentList.tsx, TriggerList.tsx, TaskDetailModal.tsx
- NotificationBell.tsx, BlockerDialog.tsx, CalendarView.tsx, WorkloadUserDetail.tsx

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
beabigegg
2026-01-07 21:24:36 +08:00
parent 2d80a8384e
commit 4b5a9c1d0a
66 changed files with 7809 additions and 171 deletions

View File

@@ -118,6 +118,9 @@ def should_encrypt_file(project: Project, db: Session) -> tuple[bool, Optional[E
Returns:
Tuple of (should_encrypt, encryption_key)
Raises:
HTTPException: If project is confidential but encryption is not available
"""
# Only encrypt for confidential projects
if project.security_level != "confidential":
@@ -125,11 +128,14 @@ def should_encrypt_file(project: Project, db: Session) -> tuple[bool, Optional[E
# Check if encryption is available
if not encryption_service.is_encryption_available():
logger.warning(
logger.error(
f"Project {project.id} is confidential but encryption is not configured. "
"Files will be stored unencrypted."
"Rejecting file upload to maintain security."
)
raise HTTPException(
status_code=400,
detail="Confidential project requires encryption. Please configure ENCRYPTION_MASTER_KEY environment variable."
)
return False, None
# Get active encryption key
active_key = db.query(EncryptionKey).filter(
@@ -137,11 +143,14 @@ def should_encrypt_file(project: Project, db: Session) -> tuple[bool, Optional[E
).first()
if not active_key:
logger.warning(
logger.error(
f"Project {project.id} is confidential but no active encryption key exists. "
"Files will be stored unencrypted. Create a key using /api/admin/encryption-keys/rotate"
"Rejecting file upload to maintain security."
)
raise HTTPException(
status_code=400,
detail="Confidential project requires encryption. Please create an active encryption key first."
)
return False, None
return True, active_key

View File

@@ -1,4 +1,4 @@
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.orm import Session
from typing import List
@@ -13,8 +13,8 @@ router = APIRouter()
@router.get("", response_model=List[DepartmentResponse])
async def list_departments(
skip: int = 0,
limit: int = 100,
skip: int = Query(0, ge=0, description="Number of departments to skip"),
limit: int = Query(100, ge=1, le=200, description="Max departments to return"),
db: Session = Depends(get_db),
current_user: User = Depends(require_permission("users.read")),
):

View File

@@ -1,4 +1,4 @@
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.orm import Session
from typing import Optional
@@ -71,8 +71,8 @@ async def generate_weekly_report(
@router.get("/api/reports/history", response_model=ReportHistoryListResponse)
async def list_report_history(
limit: int = 10,
offset: int = 0,
limit: int = Query(10, ge=1, le=100, description="Number of reports to return"),
offset: int = Query(0, ge=0, description="Number of reports to skip"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):

View File

@@ -1,5 +1,5 @@
import uuid
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.orm import Session
from typing import Optional
@@ -11,6 +11,8 @@ from app.schemas.trigger import (
)
from app.middleware.auth import get_current_user, check_project_access, check_project_edit_access
from app.services.trigger_scheduler import TriggerSchedulerService
from app.services.trigger_service import TriggerService
from app.services.action_executor import ActionValidationError
router = APIRouter(tags=["triggers"])
@@ -60,10 +62,11 @@ async def create_trigger(
)
# Validate trigger type
if trigger_data.trigger_type not in ["field_change", "schedule"]:
valid_trigger_types = ["field_change", "schedule", "creation"]
if trigger_data.trigger_type not in valid_trigger_types:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid trigger type. Must be 'field_change' or 'schedule'",
detail=f"Invalid trigger type. Must be one of: {', '.join(valid_trigger_types)}",
)
# Validate conditions based on trigger type
@@ -111,6 +114,16 @@ async def create_trigger(
detail=error_msg or "Invalid cron expression",
)
# Validate actions configuration (FEAT-014, FEAT-015)
try:
actions_dicts = [a.model_dump(exclude_none=True) for a in trigger_data.actions]
TriggerService.validate_actions(actions_dicts, db)
except ActionValidationError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
)
# Create trigger
trigger = Trigger(
id=str(uuid.uuid4()),
@@ -119,7 +132,7 @@ async def create_trigger(
description=trigger_data.description,
trigger_type=trigger_data.trigger_type,
conditions=trigger_data.conditions.model_dump(),
actions=[a.model_dump() for a in trigger_data.actions],
actions=[a.model_dump(exclude_none=True) for a in trigger_data.actions],
is_active=trigger_data.is_active,
created_by=current_user.id,
)
@@ -239,7 +252,16 @@ async def update_trigger(
)
trigger.conditions = trigger_data.conditions.model_dump(exclude_none=True)
if trigger_data.actions is not None:
trigger.actions = [a.model_dump() for a in trigger_data.actions]
# Validate actions configuration (FEAT-014, FEAT-015)
try:
actions_dicts = [a.model_dump(exclude_none=True) for a in trigger_data.actions]
TriggerService.validate_actions(actions_dicts, db)
except ActionValidationError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
)
trigger.actions = [a.model_dump(exclude_none=True) for a in trigger_data.actions]
if trigger_data.is_active is not None:
trigger.is_active = trigger_data.is_active
@@ -278,8 +300,8 @@ async def delete_trigger(
@router.get("/api/triggers/{trigger_id}/logs", response_model=TriggerLogListResponse)
async def list_trigger_logs(
trigger_id: str,
limit: int = 50,
offset: int = 0,
limit: int = Query(50, ge=1, le=200, description="Number of logs to return"),
offset: int = Query(0, ge=0, description="Number of logs to skip"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):

View File

@@ -50,8 +50,8 @@ async def search_users(
@router.get("", response_model=List[UserResponse])
async def list_users(
skip: int = 0,
limit: int = 100,
skip: int = Query(0, ge=0, description="Number of users to skip"),
limit: int = Query(100, ge=1, le=500, description="Max users to return"),
db: Session = Depends(get_db),
current_user: User = Depends(require_permission("users.read")),
):

View File

@@ -1,4 +1,4 @@
from pydantic import BaseModel
from pydantic import BaseModel, computed_field
from typing import Optional, List, Any, Dict
from datetime import datetime
from decimal import Decimal
@@ -79,6 +79,12 @@ class TaskResponse(TaskBase):
class Config:
from_attributes = True
# Alias for original_estimate for frontend compatibility
@computed_field
@property
def time_estimate(self) -> Optional[Decimal]:
return self.original_estimate
class TaskWithDetails(TaskResponse):
assignee_name: Optional[str] = None

View File

@@ -28,9 +28,23 @@ class TriggerCondition(BaseModel):
class TriggerAction(BaseModel):
type: str = Field(default="notify", description="Action type: notify")
target: str = Field(default="assignee", description="Target: assignee, creator, project_owner, user:<id>")
"""Action configuration for triggers.
Supported action types:
- notify: Send notification (requires target, optional template)
- update_field: Update task field (requires field, value)
- auto_assign: Auto-assign task (requires strategy, optional user_id for specific_user)
"""
type: str = Field(..., description="Action type: notify, update_field, auto_assign")
# Notify action fields
target: Optional[str] = Field(None, description="Target: assignee, creator, project_owner, user:<id>")
template: Optional[str] = Field(None, description="Message template with variables")
# update_field action fields (FEAT-014)
field: Optional[str] = Field(None, description="Field to update: priority, status_id, due_date")
value: Optional[Any] = Field(None, description="New value for the field")
# auto_assign action fields (FEAT-015)
strategy: Optional[str] = Field(None, description="Strategy: round_robin, least_loaded, specific_user")
user_id: Optional[str] = Field(None, description="User ID for specific_user strategy")
class TriggerCreate(BaseModel):

View File

@@ -0,0 +1,484 @@
"""Action executor service for automation triggers.
This module provides the action execution framework for the automation system.
It supports extensible action types through a registry pattern.
FEAT-014: update_field - Update task field values
FEAT-015: auto_assign - Automatic task assignment with multiple strategies
"""
import logging
import uuid
from abc import ABC, abstractmethod
from datetime import datetime, date
from decimal import Decimal
from typing import Any, Dict, List, Optional, Type
from sqlalchemy import func
from sqlalchemy.orm import Session
from app.models import Task, User, TaskStatus
logger = logging.getLogger(__name__)
class ActionExecutionError(Exception):
"""Exception raised when action execution fails."""
pass
class ActionValidationError(Exception):
"""Exception raised when action config validation fails."""
pass
class BaseAction(ABC):
"""Base class for all action types."""
action_type: str = ""
@abstractmethod
def validate_config(self, config: Dict[str, Any], db: Session) -> None:
"""Validate action configuration.
Args:
config: Action configuration dict
db: Database session for validation queries
Raises:
ActionValidationError: If config is invalid
"""
pass
@abstractmethod
def execute(
self,
db: Session,
task: Task,
config: Dict[str, Any],
context: Dict[str, Any],
) -> Dict[str, Any]:
"""Execute the action.
Args:
db: Database session
task: Target task
config: Action configuration
context: Execution context (old_values, new_values, current_user, etc.)
Returns:
Dict with execution result details
Raises:
ActionExecutionError: If execution fails
"""
pass
class UpdateFieldAction(BaseAction):
"""Action to update task field values (FEAT-014).
Supported fields:
- priority: low, medium, high, urgent
- status_id: Valid status ID for the task's project
- due_date: ISO format date string
Config format:
{
"field": "priority",
"value": "high"
}
"""
action_type = "update_field"
# Standard fields that can be updated
UPDATABLE_FIELDS = {
"priority": ["low", "medium", "high", "urgent"],
"status_id": None, # Validated dynamically
"due_date": None, # Date string validation
}
def validate_config(self, config: Dict[str, Any], db: Session) -> None:
"""Validate update_field configuration."""
field = config.get("field")
value = config.get("value")
if not field:
raise ActionValidationError("Missing required 'field' in update_field config")
if value is None:
raise ActionValidationError("Missing required 'value' in update_field config")
if field not in self.UPDATABLE_FIELDS:
raise ActionValidationError(
f"Invalid field '{field}'. Supported fields: {list(self.UPDATABLE_FIELDS.keys())}"
)
# Validate priority values
if field == "priority":
valid_values = self.UPDATABLE_FIELDS["priority"]
if value not in valid_values:
raise ActionValidationError(
f"Invalid priority value '{value}'. Valid values: {valid_values}"
)
# Validate due_date format
if field == "due_date" and value:
try:
if isinstance(value, str):
datetime.fromisoformat(value.replace("Z", "+00:00"))
except ValueError:
raise ActionValidationError(
f"Invalid due_date format '{value}'. Use ISO format (YYYY-MM-DD or YYYY-MM-DDTHH:MM:SS)"
)
def execute(
self,
db: Session,
task: Task,
config: Dict[str, Any],
context: Dict[str, Any],
) -> Dict[str, Any]:
"""Execute field update action."""
field = config["field"]
value = config["value"]
old_value = getattr(task, field, None)
logger.info(
f"Executing update_field action: task={task.id}, field={field}, "
f"old_value={old_value}, new_value={value}"
)
# Validate status_id exists for this project
if field == "status_id" and value:
status = db.query(TaskStatus).filter(
TaskStatus.id == value,
TaskStatus.project_id == task.project_id,
).first()
if not status:
raise ActionExecutionError(
f"Status ID '{value}' not found in project {task.project_id}"
)
# Convert due_date string to datetime
if field == "due_date" and value:
if isinstance(value, str):
value = datetime.fromisoformat(value.replace("Z", "+00:00"))
# Update the field
setattr(task, field, value)
task.updated_at = datetime.utcnow()
return {
"action_type": self.action_type,
"status": "success",
"field": field,
"old_value": str(old_value) if old_value else None,
"new_value": str(value) if value else None,
}
class AutoAssignAction(BaseAction):
"""Action for automatic task assignment (FEAT-015).
Strategies:
- round_robin: Assign to project members in rotation
- least_loaded: Assign to member with lowest workload
- specific_user: Assign to a specific user
Config format:
{
"strategy": "round_robin" | "least_loaded" | "specific_user",
"user_id": "xxx" // Required only for specific_user strategy
}
"""
action_type = "auto_assign"
VALID_STRATEGIES = ["round_robin", "least_loaded", "specific_user"]
# Class-level state for round-robin tracking per project
_round_robin_index: Dict[str, int] = {}
def validate_config(self, config: Dict[str, Any], db: Session) -> None:
"""Validate auto_assign configuration."""
strategy = config.get("strategy")
if not strategy:
raise ActionValidationError("Missing required 'strategy' in auto_assign config")
if strategy not in self.VALID_STRATEGIES:
raise ActionValidationError(
f"Invalid strategy '{strategy}'. Valid strategies: {self.VALID_STRATEGIES}"
)
if strategy == "specific_user":
user_id = config.get("user_id")
if not user_id:
raise ActionValidationError(
"Missing required 'user_id' for specific_user strategy"
)
# Validate user exists
user = db.query(User).filter(
User.id == user_id,
User.is_active == True,
).first()
if not user:
raise ActionValidationError(f"User '{user_id}' not found or inactive")
def execute(
self,
db: Session,
task: Task,
config: Dict[str, Any],
context: Dict[str, Any],
) -> Dict[str, Any]:
"""Execute auto-assign action."""
strategy = config["strategy"]
old_assignee_id = task.assignee_id
logger.info(
f"Executing auto_assign action: task={task.id}, strategy={strategy}, "
f"old_assignee={old_assignee_id}"
)
if strategy == "specific_user":
new_assignee_id = self._assign_specific_user(db, config)
elif strategy == "round_robin":
new_assignee_id = self._assign_round_robin(db, task)
elif strategy == "least_loaded":
new_assignee_id = self._assign_least_loaded(db, task)
else:
raise ActionExecutionError(f"Unknown strategy: {strategy}")
if new_assignee_id:
task.assignee_id = new_assignee_id
task.updated_at = datetime.utcnow()
# Get assignee name for logging
assignee = db.query(User).filter(User.id == new_assignee_id).first()
assignee_name = assignee.name if assignee else "Unknown"
logger.info(
f"Task {task.id} assigned to user {new_assignee_id} ({assignee_name}) "
f"using {strategy} strategy"
)
return {
"action_type": self.action_type,
"status": "success",
"strategy": strategy,
"old_assignee_id": old_assignee_id,
"new_assignee_id": new_assignee_id,
"assignee_name": assignee_name,
}
else:
logger.warning(
f"No eligible assignee found for task {task.id} using {strategy} strategy"
)
return {
"action_type": self.action_type,
"status": "skipped",
"strategy": strategy,
"reason": "No eligible assignee found",
}
def _assign_specific_user(self, db: Session, config: Dict[str, Any]) -> Optional[str]:
"""Assign to a specific user."""
user_id = config["user_id"]
user = db.query(User).filter(
User.id == user_id,
User.is_active == True,
).first()
return user.id if user else None
def _assign_round_robin(self, db: Session, task: Task) -> Optional[str]:
"""Assign using round-robin strategy among project members.
Gets all active users in the same department as the project owner,
then rotates through them.
"""
project = task.project
if not project:
return None
# Get project members: owner + users in same department
members = self._get_project_members(db, project)
if not members:
return None
# Get or initialize round-robin index for this project
project_id = project.id
if project_id not in self._round_robin_index:
self._round_robin_index[project_id] = 0
# Get next member in rotation
index = self._round_robin_index[project_id] % len(members)
selected_user = members[index]
# Update index for next assignment
self._round_robin_index[project_id] = (index + 1) % len(members)
return selected_user.id
def _assign_least_loaded(self, db: Session, task: Task) -> Optional[str]:
"""Assign to the project member with lowest workload.
Workload is calculated based on number of incomplete tasks assigned.
"""
project = task.project
if not project:
return None
members = self._get_project_members(db, project)
if not members:
return None
# Calculate workload for each member (count of incomplete tasks)
member_workloads = []
for member in members:
# Count incomplete tasks (status.is_done = False or no status)
incomplete_count = (
db.query(func.count(Task.id))
.outerjoin(TaskStatus, Task.status_id == TaskStatus.id)
.filter(
Task.assignee_id == member.id,
Task.is_deleted == False,
(TaskStatus.is_done == False) | (Task.status_id == None),
)
.scalar()
)
member_workloads.append((member, incomplete_count))
# Sort by workload (ascending) and return the least loaded member
member_workloads.sort(key=lambda x: x[1])
if member_workloads:
selected_user, workload = member_workloads[0]
logger.debug(
f"Least loaded assignment: user={selected_user.id}, "
f"current_workload={workload}"
)
return selected_user.id
return None
def _get_project_members(self, db: Session, project) -> List[User]:
"""Get all potential assignees for a project.
Returns active users who are either:
- The project owner
- In the same department as the project
"""
owner_id = project.owner_id
department_id = project.department_id
query = db.query(User).filter(User.is_active == True)
if department_id:
# Get users in same department OR project owner
query = query.filter(
(User.department_id == department_id) | (User.id == owner_id)
)
else:
# No department, just return owner
query = query.filter(User.id == owner_id)
return query.all()
class ActionExecutor:
"""Registry and executor for action types.
Usage:
executor = ActionExecutor()
result = executor.execute_action(db, task, action_config, context)
"""
_actions: Dict[str, Type[BaseAction]] = {}
@classmethod
def register(cls, action_class: Type[BaseAction]) -> None:
"""Register an action type."""
cls._actions[action_class.action_type] = action_class
logger.debug(f"Registered action type: {action_class.action_type}")
@classmethod
def get_supported_actions(cls) -> List[str]:
"""Get list of supported action types."""
return list(cls._actions.keys())
@classmethod
def validate_action(cls, action: Dict[str, Any], db: Session) -> None:
"""Validate an action configuration.
Args:
action: Action dict with 'type' and other config
db: Database session
Raises:
ActionValidationError: If action is invalid
"""
action_type = action.get("type")
if not action_type:
raise ActionValidationError("Missing action 'type'")
if action_type not in cls._actions:
# Allow unknown actions (like 'notify') to pass through
return
action_class = cls._actions[action_type]
action_instance = action_class()
config = action.get("config", action) # Support both nested and flat config
action_instance.validate_config(config, db)
@classmethod
def execute_action(
cls,
db: Session,
task: Task,
action: Dict[str, Any],
context: Dict[str, Any],
) -> Optional[Dict[str, Any]]:
"""Execute a single action.
Args:
db: Database session
task: Target task
action: Action dict with 'type' and other config
context: Execution context
Returns:
Execution result dict or None if action type not registered
Raises:
ActionExecutionError: If execution fails
"""
action_type = action.get("type")
if action_type not in cls._actions:
# Not a registered action (might be 'notify' handled elsewhere)
return None
action_class = cls._actions[action_type]
action_instance = action_class()
# Extract config - support both nested and flat config
config = action.get("config", action)
try:
result = action_instance.execute(db, task, config, context)
return result
except ActionExecutionError:
raise
except Exception as e:
logger.exception(f"Unexpected error executing {action_type} action")
raise ActionExecutionError(f"Failed to execute {action_type}: {str(e)}")
# Register built-in actions
ActionExecutor.register(UpdateFieldAction)
ActionExecutor.register(AutoAssignAction)

View File

@@ -87,16 +87,17 @@ class ReportService:
next_week_tasks = []
for task in all_tasks:
status_name = task.status.name.lower() if task.status else ""
is_done = status_name in ["done", "completed", "完成"]
# Use TaskStatus.is_done flag instead of magic strings
is_done = task.status.is_done if task.status else False
# Check if completed (updated this week)
if is_done:
if task.updated_at and task.updated_at >= week_start:
completed_tasks.append(task)
else:
# Check if in progress
if status_name in ["in progress", "進行中", "doing"]:
# Check if task has active status (not done, not blocked)
# Tasks without a done status are considered in progress
if task.status and not task.status.is_done:
in_progress_tasks.append(task)
# Check if overdue

View File

@@ -1,9 +1,17 @@
import uuid
import logging
from typing import List, Dict, Any, Optional
from sqlalchemy.orm import Session
from app.models import Trigger, TriggerLog, Task, User, Project
from app.services.notification_service import NotificationService
from app.services.action_executor import (
ActionExecutor,
ActionExecutionError,
ActionValidationError,
)
logger = logging.getLogger(__name__)
class TriggerService:
@@ -74,23 +82,78 @@ class TriggerService:
old_values: Dict[str, Any],
new_values: Dict[str, Any],
) -> TriggerLog:
"""Execute trigger actions and log the result."""
"""Execute trigger actions and log the result.
Uses a database savepoint to ensure atomicity - if any action fails,
all previously executed actions within this trigger are rolled back.
"""
actions = trigger.actions if isinstance(trigger.actions, list) else [trigger.actions]
executed_actions = []
error_message = None
# Build execution context
context = {
"old_values": old_values,
"new_values": new_values,
"current_user": current_user,
"trigger": trigger,
}
# Use savepoint for transaction atomicity - if any action fails,
# all changes made by previous actions will be rolled back
savepoint = db.begin_nested()
try:
for action in actions:
action_type = action.get("type")
# Handle built-in notify action
if action_type == "notify":
TriggerService._execute_notify_action(db, action, task, current_user, old_values, new_values)
executed_actions.append({"type": action_type, "status": "success"})
# Handle update_field action (FEAT-014)
elif action_type == "update_field":
result = ActionExecutor.execute_action(db, task, action, context)
if result:
executed_actions.append(result)
logger.info(
f"Trigger '{trigger.name}' executed update_field: "
f"field={result.get('field')}, new_value={result.get('new_value')}"
)
# Handle auto_assign action (FEAT-015)
elif action_type == "auto_assign":
result = ActionExecutor.execute_action(db, task, action, context)
if result:
executed_actions.append(result)
logger.info(
f"Trigger '{trigger.name}' executed auto_assign: "
f"strategy={result.get('strategy')}, assignee={result.get('new_assignee_id')}"
)
# Try to execute via ActionExecutor for extensibility
else:
result = ActionExecutor.execute_action(db, task, action, context)
if result:
executed_actions.append(result)
# All actions succeeded, commit the savepoint
savepoint.commit()
status = "success"
except Exception as e:
except ActionExecutionError as e:
# Rollback all changes made by previously executed actions
savepoint.rollback()
status = "failed"
error_message = str(e)
executed_actions.append({"type": "error", "message": str(e)})
logger.error(f"Trigger '{trigger.name}' action execution failed, rolling back: {e}")
except Exception as e:
# Rollback all changes made by previously executed actions
savepoint.rollback()
status = "failed"
error_message = str(e)
executed_actions.append({"type": "error", "message": str(e)})
logger.exception(f"Trigger '{trigger.name}' unexpected error, rolling back: {e}")
log = TriggerLog(
id=str(uuid.uuid4()),
@@ -198,3 +261,36 @@ class TriggerService:
)
db.add(log)
return log
@staticmethod
def validate_actions(actions: List[Dict[str, Any]], db: Session) -> None:
"""Validate trigger actions configuration.
Args:
actions: List of action configurations
db: Database session
Raises:
ActionValidationError: If any action is invalid
"""
valid_action_types = ["notify", "update_field", "auto_assign"]
for action in actions:
action_type = action.get("type")
if not action_type:
raise ActionValidationError("Missing action 'type'")
if action_type not in valid_action_types:
raise ActionValidationError(
f"Invalid action type '{action_type}'. "
f"Valid types: {valid_action_types}"
)
# Validate via ActionExecutor for extensible actions
if action_type in ["update_field", "auto_assign"]:
ActionExecutor.validate_action(action, db)
@staticmethod
def get_supported_action_types() -> List[str]:
"""Get list of all supported action types."""
return ["notify"] + ActionExecutor.get_supported_actions()