Files
PROJECT-CONTORL/backend/tests/test_triggers.py
beabigegg 4b5a9c1d0a feat: complete LOW priority code quality improvements
Backend:
- LOW-002: Add Query validation with max page size limits (100)
- LOW-003: Replace magic strings with TaskStatus.is_done flag
- LOW-004: Add 'creation' trigger type validation
- Add action_executor.py with UpdateFieldAction and AutoAssignAction

Frontend:
- LOW-005: Replace TypeScript 'any' with 'unknown' + type guards
- LOW-006: Add ConfirmModal component with A11Y support
- LOW-007: Add ToastContext for user feedback notifications
- LOW-009: Add Skeleton components (17 loading states replaced)
- LOW-010: Setup Vitest with 21 tests for ConfirmModal and Skeleton

Components updated:
- App.tsx, ProtectedRoute.tsx, Spaces.tsx, Projects.tsx, Tasks.tsx
- ProjectSettings.tsx, AuditPage.tsx, WorkloadPage.tsx, ProjectHealthPage.tsx
- Comments.tsx, AttachmentList.tsx, TriggerList.tsx, TaskDetailModal.tsx
- NotificationBell.tsx, BlockerDialog.tsx, CalendarView.tsx, WorkloadUserDetail.tsx

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-07 21:24:36 +08:00

494 lines
16 KiB
Python

import pytest
import uuid
from app.models import User, Space, Project, Task, TaskStatus, Trigger, TriggerLog, Notification
from app.services.trigger_service import TriggerService
@pytest.fixture
def test_user(db):
"""Create a test user."""
user = User(
id=str(uuid.uuid4()),
email="testuser@example.com",
name="Test User",
role_id="00000000-0000-0000-0000-000000000003",
is_active=True,
is_system_admin=False,
)
db.add(user)
db.commit()
return user
@pytest.fixture
def test_user_token(client, mock_redis, test_user):
"""Get a token for test user."""
from app.core.security import create_access_token, create_token_payload
token_data = create_token_payload(
user_id=test_user.id,
email=test_user.email,
role="engineer",
department_id=None,
is_system_admin=False,
)
token = create_access_token(token_data)
mock_redis.setex(f"session:{test_user.id}", 900, token)
return token
@pytest.fixture
def test_space(db, test_user):
"""Create a test space."""
space = Space(
id=str(uuid.uuid4()),
name="Test Space",
description="Test space for triggers",
owner_id=test_user.id,
)
db.add(space)
db.commit()
return space
@pytest.fixture
def test_project(db, test_space, test_user):
"""Create a test project."""
project = Project(
id=str(uuid.uuid4()),
space_id=test_space.id,
title="Test Project",
description="Test project for triggers",
owner_id=test_user.id,
)
db.add(project)
db.commit()
return project
@pytest.fixture
def test_status(db, test_project):
"""Create test task statuses."""
status1 = TaskStatus(
id=str(uuid.uuid4()),
project_id=test_project.id,
name="To Do",
color="#808080",
position=0,
)
status2 = TaskStatus(
id=str(uuid.uuid4()),
project_id=test_project.id,
name="In Progress",
color="#0000FF",
position=1,
)
db.add(status1)
db.add(status2)
db.commit()
return status1, status2
@pytest.fixture
def test_task(db, test_project, test_user, test_status):
"""Create a test task."""
task = Task(
id=str(uuid.uuid4()),
project_id=test_project.id,
title="Test Task",
description="Test task for triggers",
status_id=test_status[0].id,
created_by=test_user.id,
assignee_id=test_user.id,
)
db.add(task)
db.commit()
return task
@pytest.fixture
def test_trigger(db, test_project, test_user, test_status):
"""Create a test trigger."""
trigger = Trigger(
id=str(uuid.uuid4()),
project_id=test_project.id,
name="Status Change Trigger",
description="Notify when status changes to In Progress",
trigger_type="field_change",
conditions={
"field": "status_id",
"operator": "changed_to",
"value": test_status[1].id,
},
actions=[{
"type": "notify",
"target": "assignee",
"template": "Task {task_title} status changed",
}],
is_active=True,
created_by=test_user.id,
)
db.add(trigger)
db.commit()
return trigger
class TestTriggerService:
"""Tests for TriggerService."""
def test_check_conditions_changed_to(self, db, test_status):
"""Test changed_to condition."""
conditions = {
"field": "status_id",
"operator": "changed_to",
"value": test_status[1].id,
}
old_values = {"status_id": test_status[0].id}
new_values = {"status_id": test_status[1].id}
result = TriggerService._check_conditions(conditions, old_values, new_values)
assert result is True
def test_check_conditions_changed_to_no_match(self, db, test_status):
"""Test changed_to condition when value doesn't match."""
conditions = {
"field": "status_id",
"operator": "changed_to",
"value": test_status[1].id,
}
old_values = {"status_id": test_status[1].id}
new_values = {"status_id": test_status[0].id}
result = TriggerService._check_conditions(conditions, old_values, new_values)
assert result is False
def test_check_conditions_equals(self, db, test_status):
"""Test equals condition."""
conditions = {
"field": "status_id",
"operator": "equals",
"value": test_status[1].id,
}
old_values = {"status_id": test_status[0].id}
new_values = {"status_id": test_status[1].id}
result = TriggerService._check_conditions(conditions, old_values, new_values)
assert result is True
def test_check_conditions_not_equals(self, db, test_status):
"""Test not_equals condition."""
conditions = {
"field": "status_id",
"operator": "not_equals",
"value": test_status[0].id,
}
old_values = {"status_id": test_status[0].id}
new_values = {"status_id": test_status[1].id}
result = TriggerService._check_conditions(conditions, old_values, new_values)
assert result is True
def test_evaluate_triggers_creates_notification(self, db, test_task, test_trigger, test_user, test_status):
"""Test that evaluate_triggers creates notification when conditions match."""
# Create another user to receive notification
other_user = User(
id=str(uuid.uuid4()),
email="other@example.com",
name="Other User",
role_id="00000000-0000-0000-0000-000000000003",
is_active=True,
)
db.add(other_user)
test_task.assignee_id = other_user.id
db.commit()
old_values = {"status_id": test_status[0].id}
new_values = {"status_id": test_status[1].id}
logs = TriggerService.evaluate_triggers(db, test_task, old_values, new_values, test_user)
db.commit()
assert len(logs) == 1
assert logs[0].status == "success"
# Check notification was created
notifications = db.query(Notification).filter(
Notification.user_id == other_user.id
).all()
assert len(notifications) == 1
def test_evaluate_triggers_inactive_trigger_not_executed(self, db, test_task, test_trigger, test_user, test_status):
"""Test that inactive triggers are not executed."""
test_trigger.is_active = False
db.commit()
old_values = {"status_id": test_status[0].id}
new_values = {"status_id": test_status[1].id}
logs = TriggerService.evaluate_triggers(db, test_task, old_values, new_values, test_user)
assert len(logs) == 0
class TestTriggerAPI:
"""Tests for Trigger API endpoints."""
def test_create_trigger(self, client, test_user_token, test_project, test_status):
"""Test creating a trigger."""
response = client.post(
f"/api/projects/{test_project.id}/triggers",
headers={"Authorization": f"Bearer {test_user_token}"},
json={
"name": "New Trigger",
"description": "Test trigger",
"trigger_type": "field_change",
"conditions": {
"field": "status_id",
"operator": "changed_to",
"value": test_status[1].id,
},
"actions": [{
"type": "notify",
"target": "assignee",
}],
},
)
assert response.status_code == 201
data = response.json()
assert data["name"] == "New Trigger"
assert data["is_active"] is True
def test_list_triggers(self, client, test_user_token, test_project, test_trigger):
"""Test listing triggers."""
response = client.get(
f"/api/projects/{test_project.id}/triggers",
headers={"Authorization": f"Bearer {test_user_token}"},
)
assert response.status_code == 200
data = response.json()
assert data["total"] >= 1
assert len(data["triggers"]) >= 1
def test_get_trigger(self, client, test_user_token, test_trigger):
"""Test getting a specific trigger."""
response = client.get(
f"/api/triggers/{test_trigger.id}",
headers={"Authorization": f"Bearer {test_user_token}"},
)
assert response.status_code == 200
data = response.json()
assert data["id"] == test_trigger.id
assert data["name"] == test_trigger.name
def test_update_trigger(self, client, test_user_token, test_trigger):
"""Test updating a trigger."""
response = client.put(
f"/api/triggers/{test_trigger.id}",
headers={"Authorization": f"Bearer {test_user_token}"},
json={
"name": "Updated Trigger",
"is_active": False,
},
)
assert response.status_code == 200
data = response.json()
assert data["name"] == "Updated Trigger"
assert data["is_active"] is False
def test_delete_trigger(self, client, test_user_token, test_trigger):
"""Test deleting a trigger."""
response = client.delete(
f"/api/triggers/{test_trigger.id}",
headers={"Authorization": f"Bearer {test_user_token}"},
)
assert response.status_code == 204
# Verify deletion
response = client.get(
f"/api/triggers/{test_trigger.id}",
headers={"Authorization": f"Bearer {test_user_token}"},
)
assert response.status_code == 404
def test_get_trigger_logs(self, client, test_user_token, test_trigger, db):
"""Test getting trigger logs."""
# Create a log entry
log = TriggerLog(
id=str(uuid.uuid4()),
trigger_id=test_trigger.id,
status="success",
details={"test": True},
)
db.add(log)
db.commit()
response = client.get(
f"/api/triggers/{test_trigger.id}/logs",
headers={"Authorization": f"Bearer {test_user_token}"},
)
assert response.status_code == 200
data = response.json()
assert data["total"] >= 1
def test_create_trigger_invalid_field(self, client, test_user_token, test_project):
"""Test creating a trigger with invalid field."""
response = client.post(
f"/api/projects/{test_project.id}/triggers",
headers={"Authorization": f"Bearer {test_user_token}"},
json={
"name": "Invalid Trigger",
"trigger_type": "field_change",
"conditions": {
"field": "invalid_field",
"operator": "equals",
"value": "test",
},
"actions": [{"type": "notify", "target": "assignee"}],
},
)
assert response.status_code == 400
assert "Invalid condition field" in response.json()["detail"]
def test_create_trigger_invalid_operator(self, client, test_user_token, test_project):
"""Test creating a trigger with invalid operator."""
response = client.post(
f"/api/projects/{test_project.id}/triggers",
headers={"Authorization": f"Bearer {test_user_token}"},
json={
"name": "Invalid Trigger",
"trigger_type": "field_change",
"conditions": {
"field": "status_id",
"operator": "invalid_op",
"value": "test",
},
"actions": [{"type": "notify", "target": "assignee"}],
},
)
assert response.status_code == 400
assert "Invalid operator" in response.json()["detail"]
class TestTriggerActionRollback:
"""Tests for trigger action rollback mechanism."""
def test_multi_action_rollback_on_failure(self, db, test_task, test_project, test_user, test_status):
"""Test that when one action fails, all previous actions are rolled back.
Scenario:
1. Create a trigger with 2 actions: update_field (priority) + update_field (invalid status_id)
2. The first action should update priority to 'high'
3. The second action should fail because of invalid status_id
4. The first action's change should be rolled back
"""
# Record original priority
original_priority = test_task.priority
# Create a trigger with multiple actions where the second one will fail
trigger = Trigger(
id=str(uuid.uuid4()),
project_id=test_project.id,
name="Multi-Action Trigger",
description="Test rollback on failure",
trigger_type="field_change",
conditions={
"field": "status_id",
"operator": "changed_to",
"value": test_status[1].id,
},
actions=[
# First action: update priority (should succeed)
{
"type": "update_field",
"field": "priority",
"value": "high",
},
# Second action: update to invalid status (should fail)
{
"type": "update_field",
"field": "status_id",
"value": "non-existent-status-id",
},
],
is_active=True,
created_by=test_user.id,
)
db.add(trigger)
db.commit()
old_values = {"status_id": test_status[0].id}
new_values = {"status_id": test_status[1].id}
# Execute trigger - second action should fail
logs = TriggerService.evaluate_triggers(db, test_task, old_values, new_values, test_user)
db.commit()
# Verify trigger execution failed
assert len(logs) == 1
assert logs[0].status == "failed"
assert logs[0].error_message is not None
assert "not found" in logs[0].error_message.lower()
# Refresh task from database to get the actual state
db.refresh(test_task)
# Verify that the first action's change was rolled back
# Priority should remain unchanged (not 'high')
assert test_task.priority == original_priority, (
f"Expected priority to be rolled back to '{original_priority}', "
f"but got '{test_task.priority}'"
)
def test_all_actions_succeed_committed(self, db, test_task, test_project, test_user, test_status):
"""Test that when all actions succeed, changes are committed."""
# Create a trigger with multiple successful actions
trigger = Trigger(
id=str(uuid.uuid4()),
project_id=test_project.id,
name="Success Trigger",
description="Test successful commit",
trigger_type="field_change",
conditions={
"field": "status_id",
"operator": "changed_to",
"value": test_status[1].id,
},
actions=[
# First action: update priority
{
"type": "update_field",
"field": "priority",
"value": "urgent",
},
],
is_active=True,
created_by=test_user.id,
)
db.add(trigger)
db.commit()
old_values = {"status_id": test_status[0].id}
new_values = {"status_id": test_status[1].id}
# Execute trigger
logs = TriggerService.evaluate_triggers(db, test_task, old_values, new_values, test_user)
db.commit()
# Verify trigger execution succeeded
assert len(logs) == 1
assert logs[0].status == "success"
# Refresh task from database
db.refresh(test_task)
# Verify that the change was committed
assert test_task.priority == "urgent"