Files
PROJECT-CONTORL/backend/tests/test_audit.py
beabigegg 0ef78e13ff feat: implement audit trail module
- Backend (FastAPI):
  - AuditLog and AuditAlert models with Alembic migration
  - AuditService with SHA-256 checksum for log integrity
  - AuditMiddleware for request metadata extraction (IP, user_agent)
  - Integrated audit logging into Task, Project, Blocker APIs
  - Query API with filtering, pagination, CSV export
  - Integrity verification endpoint
  - Sensitive operation alerts with acknowledgement

- Frontend (React + Vite):
  - Admin AuditPage with filters and export
  - ResourceHistory component for change tracking
  - Audit service for API calls

- Testing:
  - 15 tests covering service and API endpoints

- OpenSpec:
  - add-audit-trail change archived

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-29 21:21:18 +08:00

343 lines
12 KiB
Python

import pytest
import uuid
from datetime import datetime, timedelta
from app.models import User, AuditLog, AuditAlert, AuditAction, SensitivityLevel
from app.services.audit_service import AuditService
@pytest.fixture
def admin_user(db):
"""Create a test admin user."""
user = User(
id=str(uuid.uuid4()),
email="testadmin@example.com",
name="Test Admin",
role_id="00000000-0000-0000-0000-000000000001",
is_active=True,
is_system_admin=True,
)
db.add(user)
db.commit()
return user
class TestAuditService:
"""Tests for AuditService."""
def test_calculate_checksum(self):
"""Test checksum calculation."""
now = datetime.utcnow()
checksum1 = AuditService.calculate_checksum(
"task.create", "resource-1", "user-1", None, now
)
checksum2 = AuditService.calculate_checksum(
"task.create", "resource-1", "user-1", None, now
)
# Same inputs should produce same checksum
assert checksum1 == checksum2
assert len(checksum1) == 64 # SHA-256 hex length
def test_checksum_different_for_different_inputs(self):
"""Test that different inputs produce different checksums."""
now = datetime.utcnow()
checksum1 = AuditService.calculate_checksum(
"task.create", "resource-1", "user-1", None, now
)
checksum2 = AuditService.calculate_checksum(
"task.update", "resource-1", "user-1", None, now
)
assert checksum1 != checksum2
def test_detect_changes(self):
"""Test change detection between old and new values."""
old_values = {"title": "Old Title", "priority": "high"}
new_values = {"title": "New Title", "priority": "high"}
changes = AuditService.detect_changes(old_values, new_values)
assert len(changes) == 1
assert changes[0]["field"] == "title"
assert changes[0]["old_value"] == "Old Title"
assert changes[0]["new_value"] == "New Title"
def test_detect_no_changes(self):
"""Test that no changes are detected when values are the same."""
values = {"title": "Same Title", "priority": "high"}
changes = AuditService.detect_changes(values, values.copy())
assert len(changes) == 0
def test_get_sensitivity_level(self):
"""Test sensitivity level mapping."""
assert AuditService.get_sensitivity_level("task.create") == SensitivityLevel.LOW
assert AuditService.get_sensitivity_level("task.delete") == SensitivityLevel.MEDIUM
assert AuditService.get_sensitivity_level("project.delete") == SensitivityLevel.HIGH
assert AuditService.get_sensitivity_level("user.permission_change") == SensitivityLevel.CRITICAL
assert AuditService.get_sensitivity_level("unknown.event") == SensitivityLevel.LOW
def test_log_event(self, db, admin_user):
"""Test logging an audit event."""
log = AuditService.log_event(
db=db,
event_type="task.create",
resource_type="task",
action=AuditAction.CREATE,
user_id=admin_user.id,
resource_id=str(uuid.uuid4()),
changes=[{"field": "title", "old_value": None, "new_value": "Test Task"}],
)
db.commit()
assert log.id is not None
assert log.event_type == "task.create"
assert log.action == "create"
assert log.sensitivity_level == "low"
assert log.checksum is not None
def test_verify_checksum_valid(self, db, admin_user):
"""Test verifying a valid checksum."""
log = AuditService.log_event(
db=db,
event_type="task.create",
resource_type="task",
action=AuditAction.CREATE,
user_id=admin_user.id,
resource_id=str(uuid.uuid4()),
)
db.commit()
db.refresh(log)
assert AuditService.verify_checksum(log) is True
def test_verify_checksum_invalid(self, db, admin_user):
"""Test that tampered checksums are detected."""
log = AuditService.log_event(
db=db,
event_type="task.create",
resource_type="task",
action=AuditAction.CREATE,
user_id=admin_user.id,
resource_id=str(uuid.uuid4()),
)
db.commit()
db.refresh(log)
# Tamper with the checksum
log.checksum = "0" * 64
assert AuditService.verify_checksum(log) is False
@pytest.fixture
def regular_user(db):
"""Create a non-admin user."""
user = User(
id=str(uuid.uuid4()),
email="regular@example.com",
name="Regular User",
role_id="00000000-0000-0000-0000-000000000003",
is_active=True,
is_system_admin=False,
)
db.add(user)
db.commit()
return user
@pytest.fixture
def regular_user_token(client, mock_redis, regular_user):
"""Get a token for regular user."""
from app.core.security import create_access_token, create_token_payload
token_data = create_token_payload(
user_id=regular_user.id,
email=regular_user.email,
role="engineer",
department_id=None,
is_system_admin=False,
)
token = create_access_token(token_data)
mock_redis.setex(f"session:{regular_user.id}", 900, token)
return token
class TestAuditAPI:
"""Tests for Audit API endpoints."""
def test_list_audit_logs_requires_admin(self, client, regular_user_token):
"""Test that non-admin users cannot access audit logs."""
response = client.get(
"/api/audit-logs",
headers={"Authorization": f"Bearer {regular_user_token}"},
)
assert response.status_code == 403
assert "Admin access required" in response.json()["detail"]
def test_list_audit_logs(self, client, admin_token, db):
"""Test listing audit logs as admin."""
# Create some audit logs
for i in range(3):
log = AuditLog(
id=str(uuid.uuid4()),
event_type="task.create",
resource_type="task",
resource_id=str(uuid.uuid4()),
user_id="00000000-0000-0000-0000-000000000001",
action="create",
sensitivity_level="low",
checksum="0" * 64,
)
db.add(log)
db.commit()
response = client.get(
"/api/audit-logs",
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == 200
data = response.json()
assert data["total"] >= 3
assert len(data["logs"]) >= 3
def test_list_audit_logs_with_filters(self, client, admin_token, db):
"""Test filtering audit logs."""
# Create logs with different resource types
for resource_type in ["task", "project", "task"]:
log = AuditLog(
id=str(uuid.uuid4()),
event_type=f"{resource_type}.create",
resource_type=resource_type,
resource_id=str(uuid.uuid4()),
user_id="00000000-0000-0000-0000-000000000001",
action="create",
sensitivity_level="low",
checksum="0" * 64,
)
db.add(log)
db.commit()
response = client.get(
"/api/audit-logs?resource_type=project",
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == 200
data = response.json()
assert all(log["resource_type"] == "project" for log in data["logs"])
def test_get_resource_history(self, client, admin_token, db):
"""Test getting resource history."""
resource_id = str(uuid.uuid4())
# Create multiple logs for the same resource
for event in ["task.create", "task.update", "task.update"]:
log = AuditLog(
id=str(uuid.uuid4()),
event_type=event,
resource_type="task",
resource_id=resource_id,
user_id="00000000-0000-0000-0000-000000000001",
action="create" if "create" in event else "update",
sensitivity_level="low",
checksum="0" * 64,
)
db.add(log)
db.commit()
response = client.get(
f"/api/audit-logs/resource/task/{resource_id}",
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == 200
data = response.json()
assert data["total"] == 3
assert all(log["resource_id"] == resource_id for log in data["logs"])
def test_verify_integrity(self, client, admin_token, db):
"""Test integrity verification."""
now = datetime.utcnow()
# Create a valid log
log = AuditService.log_event(
db=db,
event_type="task.create",
resource_type="task",
action=AuditAction.CREATE,
user_id="00000000-0000-0000-0000-000000000001",
resource_id=str(uuid.uuid4()),
)
db.commit()
response = client.post(
"/api/audit-logs/verify-integrity",
headers={"Authorization": f"Bearer {admin_token}"},
json={
"start_date": (now - timedelta(hours=1)).isoformat(),
"end_date": (now + timedelta(hours=1)).isoformat(),
},
)
assert response.status_code == 200
data = response.json()
assert data["total_checked"] >= 1
assert data["invalid_count"] == 0
def test_acknowledge_alert(self, client, admin_token, db):
"""Test acknowledging an alert."""
# Create a log and alert
log = AuditLog(
id=str(uuid.uuid4()),
event_type="project.delete",
resource_type="project",
resource_id=str(uuid.uuid4()),
user_id="00000000-0000-0000-0000-000000000001",
action="delete",
sensitivity_level="high",
checksum="0" * 64,
)
db.add(log)
db.flush()
alert = AuditAlert(
id=str(uuid.uuid4()),
audit_log_id=log.id,
alert_type="project.delete",
recipients=["00000000-0000-0000-0000-000000000001"],
message="Test alert",
)
db.add(alert)
db.commit()
response = client.put(
f"/api/audit-alerts/{alert.id}/acknowledge",
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == 200
data = response.json()
assert data["is_acknowledged"] is True
assert data["acknowledged_by"] is not None
def test_export_csv(self, client, admin_token, db):
"""Test CSV export."""
# Create some logs
for i in range(3):
log = AuditLog(
id=str(uuid.uuid4()),
event_type="task.create",
resource_type="task",
resource_id=str(uuid.uuid4()),
user_id="00000000-0000-0000-0000-000000000001",
action="create",
sensitivity_level="low",
checksum="0" * 64,
)
db.add(log)
db.commit()
response = client.get(
"/api/audit-logs/export",
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == 200
assert "text/csv" in response.headers["content-type"]
assert "attachment" in response.headers.get("content-disposition", "")