import pytest import uuid from datetime import datetime, timedelta from app.models import User, AuditLog, AuditAlert, AuditAction, SensitivityLevel from app.services.audit_service import AuditService @pytest.fixture def admin_user(db): """Create a test admin user.""" user = User( id=str(uuid.uuid4()), email="testadmin@example.com", name="Test Admin", role_id="00000000-0000-0000-0000-000000000001", is_active=True, is_system_admin=True, ) db.add(user) db.commit() return user class TestAuditService: """Tests for AuditService.""" def test_calculate_checksum(self): """Test checksum calculation.""" now = datetime.utcnow() checksum1 = AuditService.calculate_checksum( "task.create", "resource-1", "user-1", None, now ) checksum2 = AuditService.calculate_checksum( "task.create", "resource-1", "user-1", None, now ) # Same inputs should produce same checksum assert checksum1 == checksum2 assert len(checksum1) == 64 # SHA-256 hex length def test_checksum_different_for_different_inputs(self): """Test that different inputs produce different checksums.""" now = datetime.utcnow() checksum1 = AuditService.calculate_checksum( "task.create", "resource-1", "user-1", None, now ) checksum2 = AuditService.calculate_checksum( "task.update", "resource-1", "user-1", None, now ) assert checksum1 != checksum2 def test_detect_changes(self): """Test change detection between old and new values.""" old_values = {"title": "Old Title", "priority": "high"} new_values = {"title": "New Title", "priority": "high"} changes = AuditService.detect_changes(old_values, new_values) assert len(changes) == 1 assert changes[0]["field"] == "title" assert changes[0]["old_value"] == "Old Title" assert changes[0]["new_value"] == "New Title" def test_detect_no_changes(self): """Test that no changes are detected when values are the same.""" values = {"title": "Same Title", "priority": "high"} changes = AuditService.detect_changes(values, values.copy()) assert len(changes) == 0 def test_get_sensitivity_level(self): """Test sensitivity level mapping.""" assert AuditService.get_sensitivity_level("task.create") == SensitivityLevel.LOW assert AuditService.get_sensitivity_level("task.delete") == SensitivityLevel.MEDIUM assert AuditService.get_sensitivity_level("project.delete") == SensitivityLevel.HIGH assert AuditService.get_sensitivity_level("user.permission_change") == SensitivityLevel.CRITICAL assert AuditService.get_sensitivity_level("unknown.event") == SensitivityLevel.LOW def test_log_event(self, db, admin_user): """Test logging an audit event.""" log = AuditService.log_event( db=db, event_type="task.create", resource_type="task", action=AuditAction.CREATE, user_id=admin_user.id, resource_id=str(uuid.uuid4()), changes=[{"field": "title", "old_value": None, "new_value": "Test Task"}], ) db.commit() assert log.id is not None assert log.event_type == "task.create" assert log.action == "create" assert log.sensitivity_level == "low" assert log.checksum is not None def test_verify_checksum_valid(self, db, admin_user): """Test verifying a valid checksum.""" log = AuditService.log_event( db=db, event_type="task.create", resource_type="task", action=AuditAction.CREATE, user_id=admin_user.id, resource_id=str(uuid.uuid4()), ) db.commit() db.refresh(log) assert AuditService.verify_checksum(log) is True def test_verify_checksum_invalid(self, db, admin_user): """Test that tampered checksums are detected.""" log = AuditService.log_event( db=db, event_type="task.create", resource_type="task", action=AuditAction.CREATE, user_id=admin_user.id, resource_id=str(uuid.uuid4()), ) db.commit() db.refresh(log) # Tamper with the checksum log.checksum = "0" * 64 assert AuditService.verify_checksum(log) is False @pytest.fixture def regular_user(db): """Create a non-admin user.""" user = User( id=str(uuid.uuid4()), email="regular@example.com", name="Regular User", role_id="00000000-0000-0000-0000-000000000003", is_active=True, is_system_admin=False, ) db.add(user) db.commit() return user @pytest.fixture def regular_user_token(client, mock_redis, regular_user): """Get a token for regular user.""" from app.core.security import create_access_token, create_token_payload token_data = create_token_payload( user_id=regular_user.id, email=regular_user.email, role="engineer", department_id=None, is_system_admin=False, ) token = create_access_token(token_data) mock_redis.setex(f"session:{regular_user.id}", 900, token) return token class TestAuditAPI: """Tests for Audit API endpoints.""" def test_list_audit_logs_requires_admin(self, client, regular_user_token): """Test that non-admin users cannot access audit logs.""" response = client.get( "/api/audit-logs", headers={"Authorization": f"Bearer {regular_user_token}"}, ) assert response.status_code == 403 assert "Admin access required" in response.json()["detail"] def test_list_audit_logs(self, client, admin_token, db): """Test listing audit logs as admin.""" # Create some audit logs for i in range(3): log = AuditLog( id=str(uuid.uuid4()), event_type="task.create", resource_type="task", resource_id=str(uuid.uuid4()), user_id="00000000-0000-0000-0000-000000000001", action="create", sensitivity_level="low", checksum="0" * 64, ) db.add(log) db.commit() response = client.get( "/api/audit-logs", headers={"Authorization": f"Bearer {admin_token}"}, ) assert response.status_code == 200 data = response.json() assert data["total"] >= 3 assert len(data["logs"]) >= 3 def test_list_audit_logs_with_filters(self, client, admin_token, db): """Test filtering audit logs.""" # Create logs with different resource types for resource_type in ["task", "project", "task"]: log = AuditLog( id=str(uuid.uuid4()), event_type=f"{resource_type}.create", resource_type=resource_type, resource_id=str(uuid.uuid4()), user_id="00000000-0000-0000-0000-000000000001", action="create", sensitivity_level="low", checksum="0" * 64, ) db.add(log) db.commit() response = client.get( "/api/audit-logs?resource_type=project", headers={"Authorization": f"Bearer {admin_token}"}, ) assert response.status_code == 200 data = response.json() assert all(log["resource_type"] == "project" for log in data["logs"]) def test_get_resource_history(self, client, admin_token, db): """Test getting resource history.""" resource_id = str(uuid.uuid4()) # Create multiple logs for the same resource for event in ["task.create", "task.update", "task.update"]: log = AuditLog( id=str(uuid.uuid4()), event_type=event, resource_type="task", resource_id=resource_id, user_id="00000000-0000-0000-0000-000000000001", action="create" if "create" in event else "update", sensitivity_level="low", checksum="0" * 64, ) db.add(log) db.commit() response = client.get( f"/api/audit-logs/resource/task/{resource_id}", headers={"Authorization": f"Bearer {admin_token}"}, ) assert response.status_code == 200 data = response.json() assert data["total"] == 3 assert all(log["resource_id"] == resource_id for log in data["logs"]) def test_verify_integrity(self, client, admin_token, db): """Test integrity verification.""" now = datetime.utcnow() # Create a valid log log = AuditService.log_event( db=db, event_type="task.create", resource_type="task", action=AuditAction.CREATE, user_id="00000000-0000-0000-0000-000000000001", resource_id=str(uuid.uuid4()), ) db.commit() response = client.post( "/api/audit-logs/verify-integrity", headers={"Authorization": f"Bearer {admin_token}"}, json={ "start_date": (now - timedelta(hours=1)).isoformat(), "end_date": (now + timedelta(hours=1)).isoformat(), }, ) assert response.status_code == 200 data = response.json() assert data["total_checked"] >= 1 assert data["invalid_count"] == 0 def test_acknowledge_alert(self, client, admin_token, db): """Test acknowledging an alert.""" # Create a log and alert log = AuditLog( id=str(uuid.uuid4()), event_type="project.delete", resource_type="project", resource_id=str(uuid.uuid4()), user_id="00000000-0000-0000-0000-000000000001", action="delete", sensitivity_level="high", checksum="0" * 64, ) db.add(log) db.flush() alert = AuditAlert( id=str(uuid.uuid4()), audit_log_id=log.id, alert_type="project.delete", recipients=["00000000-0000-0000-0000-000000000001"], message="Test alert", ) db.add(alert) db.commit() response = client.put( f"/api/audit-alerts/{alert.id}/acknowledge", headers={"Authorization": f"Bearer {admin_token}"}, ) assert response.status_code == 200 data = response.json() assert data["is_acknowledged"] is True assert data["acknowledged_by"] is not None def test_export_csv(self, client, admin_token, db): """Test CSV export.""" # Create some logs for i in range(3): log = AuditLog( id=str(uuid.uuid4()), event_type="task.create", resource_type="task", resource_id=str(uuid.uuid4()), user_id="00000000-0000-0000-0000-000000000001", action="create", sensitivity_level="low", checksum="0" * 64, ) db.add(log) db.commit() response = client.get( "/api/audit-logs/export", headers={"Authorization": f"Bearer {admin_token}"}, ) assert response.status_code == 200 assert "text/csv" in response.headers["content-type"] assert "attachment" in response.headers.get("content-disposition", "")