Backend: - LOW-002: Add Query validation with max page size limits (100) - LOW-003: Replace magic strings with TaskStatus.is_done flag - LOW-004: Add 'creation' trigger type validation - Add action_executor.py with UpdateFieldAction and AutoAssignAction Frontend: - LOW-005: Replace TypeScript 'any' with 'unknown' + type guards - LOW-006: Add ConfirmModal component with A11Y support - LOW-007: Add ToastContext for user feedback notifications - LOW-009: Add Skeleton components (17 loading states replaced) - LOW-010: Setup Vitest with 21 tests for ConfirmModal and Skeleton Components updated: - App.tsx, ProtectedRoute.tsx, Spaces.tsx, Projects.tsx, Tasks.tsx - ProjectSettings.tsx, AuditPage.tsx, WorkloadPage.tsx, ProjectHealthPage.tsx - Comments.tsx, AttachmentList.tsx, TriggerList.tsx, TaskDetailModal.tsx - NotificationBell.tsx, BlockerDialog.tsx, CalendarView.tsx, WorkloadUserDetail.tsx 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
418 lines
15 KiB
Python
418 lines
15 KiB
Python
"""
|
|
Tests for the file encryption functionality.
|
|
|
|
Tests cover:
|
|
- Encryption service (key generation, encrypt/decrypt)
|
|
- Encryption key management API
|
|
- Attachment upload with encryption
|
|
- Attachment download with decryption
|
|
"""
|
|
import pytest
|
|
import base64
|
|
import secrets
|
|
from io import BytesIO
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
from app.services.encryption_service import (
|
|
EncryptionService,
|
|
encryption_service,
|
|
MasterKeyNotConfiguredError,
|
|
DecryptionError,
|
|
KEY_SIZE,
|
|
NONCE_SIZE,
|
|
)
|
|
|
|
|
|
class TestEncryptionService:
|
|
"""Tests for the encryption service."""
|
|
|
|
@pytest.fixture
|
|
def mock_master_key(self):
|
|
"""Generate a valid test master key."""
|
|
return base64.urlsafe_b64encode(secrets.token_bytes(32)).decode()
|
|
|
|
@pytest.fixture
|
|
def service_with_key(self, mock_master_key):
|
|
"""Create an encryption service with a mock master key."""
|
|
with patch('app.services.encryption_service.settings') as mock_settings:
|
|
mock_settings.ENCRYPTION_MASTER_KEY = mock_master_key
|
|
service = EncryptionService()
|
|
yield service
|
|
|
|
def test_generate_key(self, service_with_key):
|
|
"""Test that generate_key produces a 32-byte key."""
|
|
key = service_with_key.generate_key()
|
|
assert len(key) == KEY_SIZE
|
|
assert isinstance(key, bytes)
|
|
|
|
def test_generate_key_uniqueness(self, service_with_key):
|
|
"""Test that each generated key is unique."""
|
|
keys = [service_with_key.generate_key() for _ in range(10)]
|
|
unique_keys = set(keys)
|
|
assert len(unique_keys) == 10
|
|
|
|
def test_encrypt_decrypt_key(self, service_with_key):
|
|
"""Test encryption and decryption of a file encryption key."""
|
|
# Generate a key to encrypt
|
|
original_key = service_with_key.generate_key()
|
|
|
|
# Encrypt the key
|
|
encrypted_key = service_with_key.encrypt_key(original_key)
|
|
assert isinstance(encrypted_key, str)
|
|
assert encrypted_key != base64.urlsafe_b64encode(original_key).decode()
|
|
|
|
# Decrypt the key
|
|
decrypted_key = service_with_key.decrypt_key(encrypted_key)
|
|
assert decrypted_key == original_key
|
|
|
|
def test_encrypt_decrypt_file(self, service_with_key):
|
|
"""Test file encryption and decryption."""
|
|
# Create test file content
|
|
original_content = b"This is a test file content for encryption."
|
|
file_obj = BytesIO(original_content)
|
|
|
|
# Generate encryption key
|
|
key = service_with_key.generate_key()
|
|
|
|
# Encrypt
|
|
encrypted_content = service_with_key.encrypt_file(file_obj, key)
|
|
assert encrypted_content != original_content
|
|
assert len(encrypted_content) > len(original_content) # Due to nonce and tag
|
|
|
|
# Decrypt
|
|
encrypted_file = BytesIO(encrypted_content)
|
|
decrypted_content = service_with_key.decrypt_file(encrypted_file, key)
|
|
assert decrypted_content == original_content
|
|
|
|
def test_encrypt_decrypt_bytes(self, service_with_key):
|
|
"""Test bytes encryption and decryption convenience methods."""
|
|
original_data = b"Test data for encryption"
|
|
key = service_with_key.generate_key()
|
|
|
|
# Encrypt
|
|
encrypted_data = service_with_key.encrypt_bytes(original_data, key)
|
|
assert encrypted_data != original_data
|
|
|
|
# Decrypt
|
|
decrypted_data = service_with_key.decrypt_bytes(encrypted_data, key)
|
|
assert decrypted_data == original_data
|
|
|
|
def test_encrypt_large_file(self, service_with_key):
|
|
"""Test encryption of a larger file (1MB)."""
|
|
# Create 1MB of random data
|
|
original_content = secrets.token_bytes(1024 * 1024)
|
|
file_obj = BytesIO(original_content)
|
|
key = service_with_key.generate_key()
|
|
|
|
# Encrypt
|
|
encrypted_content = service_with_key.encrypt_file(file_obj, key)
|
|
|
|
# Decrypt
|
|
encrypted_file = BytesIO(encrypted_content)
|
|
decrypted_content = service_with_key.decrypt_file(encrypted_file, key)
|
|
|
|
assert decrypted_content == original_content
|
|
|
|
def test_decrypt_with_wrong_key(self, service_with_key):
|
|
"""Test that decryption fails with wrong key."""
|
|
original_content = b"Secret content"
|
|
file_obj = BytesIO(original_content)
|
|
|
|
key1 = service_with_key.generate_key()
|
|
key2 = service_with_key.generate_key()
|
|
|
|
# Encrypt with key1
|
|
encrypted_content = service_with_key.encrypt_file(file_obj, key1)
|
|
|
|
# Try to decrypt with key2
|
|
encrypted_file = BytesIO(encrypted_content)
|
|
with pytest.raises(DecryptionError):
|
|
service_with_key.decrypt_file(encrypted_file, key2)
|
|
|
|
def test_decrypt_corrupted_data(self, service_with_key):
|
|
"""Test that decryption fails with corrupted data."""
|
|
original_content = b"Secret content"
|
|
file_obj = BytesIO(original_content)
|
|
key = service_with_key.generate_key()
|
|
|
|
# Encrypt
|
|
encrypted_content = service_with_key.encrypt_file(file_obj, key)
|
|
|
|
# Corrupt the encrypted data
|
|
corrupted = bytearray(encrypted_content)
|
|
corrupted[20] ^= 0xFF # Flip some bits
|
|
corrupted_content = bytes(corrupted)
|
|
|
|
# Try to decrypt
|
|
encrypted_file = BytesIO(corrupted_content)
|
|
with pytest.raises(DecryptionError):
|
|
service_with_key.decrypt_file(encrypted_file, key)
|
|
|
|
def test_is_encryption_available_with_key(self, mock_master_key):
|
|
"""Test is_encryption_available returns True when key is configured."""
|
|
with patch('app.services.encryption_service.settings') as mock_settings:
|
|
mock_settings.ENCRYPTION_MASTER_KEY = mock_master_key
|
|
service = EncryptionService()
|
|
assert service.is_encryption_available() is True
|
|
|
|
def test_is_encryption_available_without_key(self):
|
|
"""Test is_encryption_available returns False when key is not configured."""
|
|
with patch('app.services.encryption_service.settings') as mock_settings:
|
|
mock_settings.ENCRYPTION_MASTER_KEY = None
|
|
service = EncryptionService()
|
|
assert service.is_encryption_available() is False
|
|
|
|
def test_master_key_not_configured_error(self):
|
|
"""Test that operations fail when master key is not configured."""
|
|
with patch('app.services.encryption_service.settings') as mock_settings:
|
|
mock_settings.ENCRYPTION_MASTER_KEY = None
|
|
service = EncryptionService()
|
|
|
|
key = secrets.token_bytes(32)
|
|
with pytest.raises(MasterKeyNotConfiguredError):
|
|
service.encrypt_key(key)
|
|
|
|
def test_encrypted_key_format(self, service_with_key):
|
|
"""Test that encrypted key is valid base64."""
|
|
key = service_with_key.generate_key()
|
|
encrypted_key = service_with_key.encrypt_key(key)
|
|
|
|
# Should be valid base64
|
|
decoded = base64.urlsafe_b64decode(encrypted_key)
|
|
# Should contain nonce + ciphertext + tag
|
|
assert len(decoded) >= NONCE_SIZE + KEY_SIZE + 16 # 16 = GCM tag size
|
|
|
|
|
|
class TestEncryptionServiceStreaming:
|
|
"""Tests for streaming encryption (for large files)."""
|
|
|
|
@pytest.fixture
|
|
def mock_master_key(self):
|
|
"""Generate a valid test master key."""
|
|
return base64.urlsafe_b64encode(secrets.token_bytes(32)).decode()
|
|
|
|
@pytest.fixture
|
|
def service_with_key(self, mock_master_key):
|
|
"""Create an encryption service with a mock master key."""
|
|
with patch('app.services.encryption_service.settings') as mock_settings:
|
|
mock_settings.ENCRYPTION_MASTER_KEY = mock_master_key
|
|
service = EncryptionService()
|
|
yield service
|
|
|
|
def test_streaming_encrypt_decrypt(self, service_with_key):
|
|
"""Test streaming encryption and decryption."""
|
|
# Create test content
|
|
original_content = b"Test content for streaming encryption. " * 1000
|
|
file_obj = BytesIO(original_content)
|
|
key = service_with_key.generate_key()
|
|
|
|
# Encrypt using streaming
|
|
encrypted_chunks = list(service_with_key.encrypt_file_streaming(file_obj, key))
|
|
encrypted_content = b''.join(encrypted_chunks)
|
|
|
|
# Decrypt using streaming
|
|
encrypted_file = BytesIO(encrypted_content)
|
|
decrypted_chunks = list(service_with_key.decrypt_file_streaming(encrypted_file, key))
|
|
decrypted_content = b''.join(decrypted_chunks)
|
|
|
|
assert decrypted_content == original_content
|
|
|
|
|
|
class TestEncryptionKeyValidation:
|
|
"""Tests for encryption key validation in config."""
|
|
|
|
def test_valid_master_key(self):
|
|
"""Test that a valid master key passes validation."""
|
|
from app.core.config import Settings
|
|
|
|
valid_key = base64.urlsafe_b64encode(secrets.token_bytes(32)).decode()
|
|
|
|
# This should not raise
|
|
with patch.dict('os.environ', {
|
|
'JWT_SECRET_KEY': 'test-secret-key-that-is-valid',
|
|
'ENCRYPTION_MASTER_KEY': valid_key
|
|
}):
|
|
settings = Settings()
|
|
assert settings.ENCRYPTION_MASTER_KEY == valid_key
|
|
|
|
def test_invalid_master_key_length(self):
|
|
"""Test that an invalid length master key fails validation."""
|
|
from app.core.config import Settings
|
|
|
|
# 16 bytes instead of 32
|
|
invalid_key = base64.urlsafe_b64encode(secrets.token_bytes(16)).decode()
|
|
|
|
with patch.dict('os.environ', {
|
|
'JWT_SECRET_KEY': 'test-secret-key-that-is-valid',
|
|
'ENCRYPTION_MASTER_KEY': invalid_key
|
|
}):
|
|
with pytest.raises(ValueError, match="must be a base64-encoded 32-byte key"):
|
|
Settings()
|
|
|
|
def test_invalid_master_key_format(self):
|
|
"""Test that an invalid format master key fails validation."""
|
|
from app.core.config import Settings
|
|
from pydantic import ValidationError
|
|
|
|
invalid_key = "not-valid-base64!@#$"
|
|
|
|
with patch.dict('os.environ', {
|
|
'JWT_SECRET_KEY': 'test-secret-key-that-is-valid',
|
|
'ENCRYPTION_MASTER_KEY': invalid_key
|
|
}):
|
|
with pytest.raises(ValidationError, match="ENCRYPTION_MASTER_KEY"):
|
|
Settings()
|
|
|
|
def test_empty_master_key_allowed(self):
|
|
"""Test that empty master key is allowed (encryption disabled)."""
|
|
from app.core.config import Settings
|
|
|
|
with patch.dict('os.environ', {
|
|
'JWT_SECRET_KEY': 'test-secret-key-that-is-valid',
|
|
'ENCRYPTION_MASTER_KEY': ''
|
|
}):
|
|
settings = Settings()
|
|
assert settings.ENCRYPTION_MASTER_KEY is None
|
|
|
|
|
|
class TestConfidentialProjectUpload:
|
|
"""Tests for file upload on confidential projects."""
|
|
|
|
@pytest.fixture
|
|
def test_user(self, db):
|
|
"""Create a test user."""
|
|
from app.models import User
|
|
import uuid as uuid_module
|
|
|
|
user = User(
|
|
id=str(uuid_module.uuid4()),
|
|
email="testuser_enc@example.com",
|
|
name="Test User Encryption",
|
|
role_id="00000000-0000-0000-0000-000000000003",
|
|
is_active=True,
|
|
is_system_admin=False,
|
|
)
|
|
db.add(user)
|
|
db.commit()
|
|
return user
|
|
|
|
@pytest.fixture
|
|
def test_user_token(self, client, mock_redis, test_user):
|
|
"""Get a token for test user."""
|
|
from app.core.security import create_access_token, create_token_payload
|
|
|
|
token_data = create_token_payload(
|
|
user_id=test_user.id,
|
|
email=test_user.email,
|
|
role="engineer",
|
|
department_id=None,
|
|
is_system_admin=False,
|
|
)
|
|
token = create_access_token(token_data)
|
|
mock_redis.setex(f"session:{test_user.id}", 900, token)
|
|
return token
|
|
|
|
@pytest.fixture
|
|
def test_space(self, db, test_user):
|
|
"""Create a test space."""
|
|
from app.models import Space
|
|
import uuid as uuid_module
|
|
|
|
space = Space(
|
|
id=str(uuid_module.uuid4()),
|
|
name="Test Space Encryption",
|
|
description="Test space for encryption tests",
|
|
owner_id=test_user.id,
|
|
)
|
|
db.add(space)
|
|
db.commit()
|
|
return space
|
|
|
|
@pytest.fixture
|
|
def confidential_project(self, db, test_space, test_user):
|
|
"""Create a confidential test project."""
|
|
from app.models import Project
|
|
import uuid as uuid_module
|
|
|
|
project = Project(
|
|
id=str(uuid_module.uuid4()),
|
|
space_id=test_space.id,
|
|
title="Confidential Project",
|
|
description="Test confidential project",
|
|
owner_id=test_user.id,
|
|
security_level="confidential",
|
|
)
|
|
db.add(project)
|
|
db.commit()
|
|
return project
|
|
|
|
@pytest.fixture
|
|
def test_task(self, db, confidential_project, test_user):
|
|
"""Create a test task in confidential project."""
|
|
from app.models import Task
|
|
import uuid as uuid_module
|
|
|
|
task = Task(
|
|
id=str(uuid_module.uuid4()),
|
|
project_id=confidential_project.id,
|
|
title="Test Task Encryption",
|
|
description="Test task for encryption tests",
|
|
created_by=test_user.id,
|
|
)
|
|
db.add(task)
|
|
db.commit()
|
|
return task
|
|
|
|
def test_upload_confidential_project_encryption_unavailable(
|
|
self, client, test_user_token, test_task, db
|
|
):
|
|
"""Test that uploading to confidential project returns 400 when encryption is unavailable."""
|
|
from io import BytesIO
|
|
|
|
# Mock encryption service to return False for is_encryption_available
|
|
with patch('app.api.attachments.router.encryption_service') as mock_enc_service:
|
|
mock_enc_service.is_encryption_available.return_value = False
|
|
|
|
content = b"Test file content"
|
|
files = {"file": ("test.pdf", BytesIO(content), "application/pdf")}
|
|
|
|
response = client.post(
|
|
f"/api/tasks/{test_task.id}/attachments",
|
|
headers={"Authorization": f"Bearer {test_user_token}"},
|
|
files=files,
|
|
)
|
|
|
|
assert response.status_code == 400
|
|
assert "ENCRYPTION_MASTER_KEY" in response.json()["detail"]
|
|
assert "environment variable" in response.json()["detail"]
|
|
|
|
def test_upload_confidential_project_no_active_key(
|
|
self, client, test_user_token, test_task, db
|
|
):
|
|
"""Test that uploading to confidential project returns 400 when no active encryption key exists."""
|
|
from io import BytesIO
|
|
from app.models import EncryptionKey
|
|
|
|
# Make sure no active encryption keys exist
|
|
db.query(EncryptionKey).filter(EncryptionKey.is_active == True).update(
|
|
{"is_active": False}
|
|
)
|
|
db.commit()
|
|
|
|
# Mock encryption service to return True for is_encryption_available
|
|
with patch('app.api.attachments.router.encryption_service') as mock_enc_service:
|
|
mock_enc_service.is_encryption_available.return_value = True
|
|
|
|
content = b"Test file content"
|
|
files = {"file": ("test.pdf", BytesIO(content), "application/pdf")}
|
|
|
|
response = client.post(
|
|
f"/api/tasks/{test_task.id}/attachments",
|
|
headers={"Authorization": f"Bearer {test_user_token}"},
|
|
files=files,
|
|
)
|
|
|
|
assert response.status_code == 400
|
|
assert "active encryption key" in response.json()["detail"]
|
|
assert "create" in response.json()["detail"].lower()
|