feat: implement audit trail alignment (soft delete & permission audit)

- Task Soft Delete:
  - Add is_deleted, deleted_at, deleted_by fields to Task model
  - Convert DELETE to soft delete with cascade to subtasks
  - Add include_deleted query param (admin only)
  - Add POST /api/tasks/{id}/restore endpoint
  - Exclude deleted tasks from subtask_count

- Permission Change Audit:
  - Add user.role_change event (high sensitivity)
  - Add user.admin_change event (critical, triggers alert)
  - Add PATCH /api/users/{id}/admin endpoint
  - Add role.permission_change event type

- Append-Only Enforcement:
  - Add DB triggers for audit_logs immutability (manual for production)
  - Migration 008 with graceful trigger failure handling

- Tests: 11 new soft delete tests (153 total passing)
- OpenSpec: fix-audit-trail archived, fix-realtime-notifications & fix-weekly-report proposals added

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
beabigegg
2025-12-30 06:58:30 +08:00
parent 95c281d8e1
commit 10db2c9d1f
18 changed files with 1455 additions and 12 deletions

View File

@@ -0,0 +1,347 @@
import pytest
import uuid
from app.models import User, Space, Project, Task, TaskStatus
@pytest.fixture
def test_admin(db):
"""Create a test admin user."""
user = User(
id=str(uuid.uuid4()),
email="admin@example.com",
name="Admin User",
role_id="00000000-0000-0000-0000-000000000001",
is_active=True,
is_system_admin=True,
)
db.add(user)
db.commit()
return user
@pytest.fixture
def test_regular_user(db):
"""Create a test regular user."""
user = User(
id=str(uuid.uuid4()),
email="regular@example.com",
name="Regular User",
role_id="00000000-0000-0000-0000-000000000003",
is_active=True,
is_system_admin=False,
)
db.add(user)
db.commit()
return user
@pytest.fixture
def admin_token(client, mock_redis, test_admin):
"""Get a token for admin user."""
from app.core.security import create_access_token, create_token_payload
token_data = create_token_payload(
user_id=test_admin.id,
email=test_admin.email,
role="super_admin",
department_id=None,
is_system_admin=True,
)
token = create_access_token(token_data)
mock_redis.setex(f"session:{test_admin.id}", 900, token)
return token
@pytest.fixture
def regular_token(client, mock_redis, test_regular_user):
"""Get a token for regular user."""
from app.core.security import create_access_token, create_token_payload
token_data = create_token_payload(
user_id=test_regular_user.id,
email=test_regular_user.email,
role="engineer",
department_id=None,
is_system_admin=False,
)
token = create_access_token(token_data)
mock_redis.setex(f"session:{test_regular_user.id}", 900, token)
return token
@pytest.fixture
def test_space(db, test_admin):
"""Create a test space."""
space = Space(
id=str(uuid.uuid4()),
name="Test Space",
description="Test space",
owner_id=test_admin.id,
)
db.add(space)
db.commit()
return space
@pytest.fixture
def test_project(db, test_space, test_admin):
"""Create a test project with public access."""
project = Project(
id=str(uuid.uuid4()),
space_id=test_space.id,
title="Test Project",
description="Test project",
owner_id=test_admin.id,
security_level="public", # Allow all users to access
)
db.add(project)
db.commit()
return project
@pytest.fixture
def test_status(db, test_project):
"""Create a test task status."""
status = TaskStatus(
id=str(uuid.uuid4()),
project_id=test_project.id,
name="To Do",
color="#808080",
position=0,
)
db.add(status)
db.commit()
return status
@pytest.fixture
def test_task(db, test_project, test_admin, test_status):
"""Create a test task."""
task = Task(
id=str(uuid.uuid4()),
project_id=test_project.id,
title="Test Task",
status_id=test_status.id,
created_by=test_admin.id,
)
db.add(task)
db.commit()
return task
@pytest.fixture
def test_task_with_subtask(db, test_project, test_admin, test_status, test_task):
"""Create a test task with subtask."""
subtask = Task(
id=str(uuid.uuid4()),
project_id=test_project.id,
parent_task_id=test_task.id,
title="Subtask",
status_id=test_status.id,
created_by=test_admin.id,
)
db.add(subtask)
db.commit()
return subtask
class TestSoftDelete:
"""Tests for soft delete functionality."""
def test_delete_task_soft_deletes(self, client, admin_token, test_task, db):
"""Test that DELETE soft-deletes a task."""
response = client.delete(
f"/api/tasks/{test_task.id}",
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == 200
data = response.json()
assert data["id"] == test_task.id
# Verify in database
db.refresh(test_task)
assert test_task.is_deleted is True
assert test_task.deleted_at is not None
assert test_task.deleted_by is not None
def test_deleted_task_not_in_list(self, client, admin_token, test_project, test_task, db):
"""Test that deleted tasks are not shown in list."""
# Delete the task
client.delete(
f"/api/tasks/{test_task.id}",
headers={"Authorization": f"Bearer {admin_token}"},
)
# List tasks
response = client.get(
f"/api/projects/{test_project.id}/tasks",
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == 200
data = response.json()
assert data["total"] == 0
def test_admin_can_list_deleted_with_include_deleted(self, client, admin_token, test_project, test_task, db):
"""Test that admin can see deleted tasks with include_deleted parameter."""
# Delete the task
client.delete(
f"/api/tasks/{test_task.id}",
headers={"Authorization": f"Bearer {admin_token}"},
)
# List with include_deleted
response = client.get(
f"/api/projects/{test_project.id}/tasks?include_deleted=true",
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == 200
data = response.json()
assert data["total"] == 1
assert data["tasks"][0]["id"] == test_task.id
def test_regular_user_cannot_see_deleted_with_include_deleted(self, client, regular_token, test_project, test_task, admin_token, db):
"""Test that non-admin cannot see deleted tasks even with include_deleted."""
# Delete the task as admin
client.delete(
f"/api/tasks/{test_task.id}",
headers={"Authorization": f"Bearer {admin_token}"},
)
# Try to list with include_deleted as regular user
response = client.get(
f"/api/projects/{test_project.id}/tasks?include_deleted=true",
headers={"Authorization": f"Bearer {regular_token}"},
)
assert response.status_code == 200
data = response.json()
assert data["total"] == 0
def test_get_deleted_task_returns_404_for_regular_user(self, client, admin_token, regular_token, test_task, db):
"""Test that getting a deleted task returns 404 for non-admin."""
# Delete the task
client.delete(
f"/api/tasks/{test_task.id}",
headers={"Authorization": f"Bearer {admin_token}"},
)
# Try to get as regular user
response = client.get(
f"/api/tasks/{test_task.id}",
headers={"Authorization": f"Bearer {regular_token}"},
)
assert response.status_code == 404
def test_admin_can_view_deleted_task(self, client, admin_token, test_task, db):
"""Test that admin can view a deleted task."""
# Delete the task
client.delete(
f"/api/tasks/{test_task.id}",
headers={"Authorization": f"Bearer {admin_token}"},
)
# Get as admin
response = client.get(
f"/api/tasks/{test_task.id}",
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == 200
def test_cascade_soft_delete_subtasks(self, client, admin_token, test_task, test_task_with_subtask, db):
"""Test that deleting a parent task soft-deletes its subtasks."""
# Delete the parent task
response = client.delete(
f"/api/tasks/{test_task.id}",
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == 200
# Verify subtask is also soft-deleted
db.refresh(test_task_with_subtask)
assert test_task_with_subtask.is_deleted is True
class TestRestoreTask:
"""Tests for task restoration functionality."""
def test_restore_task(self, client, admin_token, test_task, db):
"""Test that admin can restore a deleted task."""
# Delete the task
client.delete(
f"/api/tasks/{test_task.id}",
headers={"Authorization": f"Bearer {admin_token}"},
)
# Restore the task
response = client.post(
f"/api/tasks/{test_task.id}/restore",
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == 200
# Verify in database
db.refresh(test_task)
assert test_task.is_deleted is False
assert test_task.deleted_at is None
assert test_task.deleted_by is None
def test_regular_user_cannot_restore(self, client, admin_token, regular_token, test_task, db):
"""Test that non-admin cannot restore a deleted task."""
# Delete the task
client.delete(
f"/api/tasks/{test_task.id}",
headers={"Authorization": f"Bearer {admin_token}"},
)
# Try to restore as regular user
response = client.post(
f"/api/tasks/{test_task.id}/restore",
headers={"Authorization": f"Bearer {regular_token}"},
)
assert response.status_code == 403
def test_cannot_restore_non_deleted_task(self, client, admin_token, test_task, db):
"""Test that restoring a non-deleted task returns error."""
response = client.post(
f"/api/tasks/{test_task.id}/restore",
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == 400
assert "not deleted" in response.json()["detail"]
class TestSubtaskCount:
"""Tests for subtask count excluding deleted."""
def test_subtask_count_excludes_deleted(self, client, admin_token, test_task, test_task_with_subtask, db):
"""Test that subtask_count excludes deleted subtasks."""
# Get parent task before deletion
response = client.get(
f"/api/tasks/{test_task.id}",
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == 200
assert response.json()["subtask_count"] == 1
# Delete subtask
client.delete(
f"/api/tasks/{test_task_with_subtask.id}",
headers={"Authorization": f"Bearer {admin_token}"},
)
# Get parent task after deletion
response = client.get(
f"/api/tasks/{test_task.id}",
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == 200
assert response.json()["subtask_count"] == 0