Files
PROJECT-CONTORL/backend/tests/test_encryption.py
beabigegg 4b5a9c1d0a feat: complete LOW priority code quality improvements
Backend:
- LOW-002: Add Query validation with max page size limits (100)
- LOW-003: Replace magic strings with TaskStatus.is_done flag
- LOW-004: Add 'creation' trigger type validation
- Add action_executor.py with UpdateFieldAction and AutoAssignAction

Frontend:
- LOW-005: Replace TypeScript 'any' with 'unknown' + type guards
- LOW-006: Add ConfirmModal component with A11Y support
- LOW-007: Add ToastContext for user feedback notifications
- LOW-009: Add Skeleton components (17 loading states replaced)
- LOW-010: Setup Vitest with 21 tests for ConfirmModal and Skeleton

Components updated:
- App.tsx, ProtectedRoute.tsx, Spaces.tsx, Projects.tsx, Tasks.tsx
- ProjectSettings.tsx, AuditPage.tsx, WorkloadPage.tsx, ProjectHealthPage.tsx
- Comments.tsx, AttachmentList.tsx, TriggerList.tsx, TaskDetailModal.tsx
- NotificationBell.tsx, BlockerDialog.tsx, CalendarView.tsx, WorkloadUserDetail.tsx

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-07 21:24:36 +08:00

418 lines
15 KiB
Python

"""
Tests for the file encryption functionality.
Tests cover:
- Encryption service (key generation, encrypt/decrypt)
- Encryption key management API
- Attachment upload with encryption
- Attachment download with decryption
"""
import pytest
import base64
import secrets
from io import BytesIO
from unittest.mock import patch, MagicMock
from app.services.encryption_service import (
EncryptionService,
encryption_service,
MasterKeyNotConfiguredError,
DecryptionError,
KEY_SIZE,
NONCE_SIZE,
)
class TestEncryptionService:
"""Tests for the encryption service."""
@pytest.fixture
def mock_master_key(self):
"""Generate a valid test master key."""
return base64.urlsafe_b64encode(secrets.token_bytes(32)).decode()
@pytest.fixture
def service_with_key(self, mock_master_key):
"""Create an encryption service with a mock master key."""
with patch('app.services.encryption_service.settings') as mock_settings:
mock_settings.ENCRYPTION_MASTER_KEY = mock_master_key
service = EncryptionService()
yield service
def test_generate_key(self, service_with_key):
"""Test that generate_key produces a 32-byte key."""
key = service_with_key.generate_key()
assert len(key) == KEY_SIZE
assert isinstance(key, bytes)
def test_generate_key_uniqueness(self, service_with_key):
"""Test that each generated key is unique."""
keys = [service_with_key.generate_key() for _ in range(10)]
unique_keys = set(keys)
assert len(unique_keys) == 10
def test_encrypt_decrypt_key(self, service_with_key):
"""Test encryption and decryption of a file encryption key."""
# Generate a key to encrypt
original_key = service_with_key.generate_key()
# Encrypt the key
encrypted_key = service_with_key.encrypt_key(original_key)
assert isinstance(encrypted_key, str)
assert encrypted_key != base64.urlsafe_b64encode(original_key).decode()
# Decrypt the key
decrypted_key = service_with_key.decrypt_key(encrypted_key)
assert decrypted_key == original_key
def test_encrypt_decrypt_file(self, service_with_key):
"""Test file encryption and decryption."""
# Create test file content
original_content = b"This is a test file content for encryption."
file_obj = BytesIO(original_content)
# Generate encryption key
key = service_with_key.generate_key()
# Encrypt
encrypted_content = service_with_key.encrypt_file(file_obj, key)
assert encrypted_content != original_content
assert len(encrypted_content) > len(original_content) # Due to nonce and tag
# Decrypt
encrypted_file = BytesIO(encrypted_content)
decrypted_content = service_with_key.decrypt_file(encrypted_file, key)
assert decrypted_content == original_content
def test_encrypt_decrypt_bytes(self, service_with_key):
"""Test bytes encryption and decryption convenience methods."""
original_data = b"Test data for encryption"
key = service_with_key.generate_key()
# Encrypt
encrypted_data = service_with_key.encrypt_bytes(original_data, key)
assert encrypted_data != original_data
# Decrypt
decrypted_data = service_with_key.decrypt_bytes(encrypted_data, key)
assert decrypted_data == original_data
def test_encrypt_large_file(self, service_with_key):
"""Test encryption of a larger file (1MB)."""
# Create 1MB of random data
original_content = secrets.token_bytes(1024 * 1024)
file_obj = BytesIO(original_content)
key = service_with_key.generate_key()
# Encrypt
encrypted_content = service_with_key.encrypt_file(file_obj, key)
# Decrypt
encrypted_file = BytesIO(encrypted_content)
decrypted_content = service_with_key.decrypt_file(encrypted_file, key)
assert decrypted_content == original_content
def test_decrypt_with_wrong_key(self, service_with_key):
"""Test that decryption fails with wrong key."""
original_content = b"Secret content"
file_obj = BytesIO(original_content)
key1 = service_with_key.generate_key()
key2 = service_with_key.generate_key()
# Encrypt with key1
encrypted_content = service_with_key.encrypt_file(file_obj, key1)
# Try to decrypt with key2
encrypted_file = BytesIO(encrypted_content)
with pytest.raises(DecryptionError):
service_with_key.decrypt_file(encrypted_file, key2)
def test_decrypt_corrupted_data(self, service_with_key):
"""Test that decryption fails with corrupted data."""
original_content = b"Secret content"
file_obj = BytesIO(original_content)
key = service_with_key.generate_key()
# Encrypt
encrypted_content = service_with_key.encrypt_file(file_obj, key)
# Corrupt the encrypted data
corrupted = bytearray(encrypted_content)
corrupted[20] ^= 0xFF # Flip some bits
corrupted_content = bytes(corrupted)
# Try to decrypt
encrypted_file = BytesIO(corrupted_content)
with pytest.raises(DecryptionError):
service_with_key.decrypt_file(encrypted_file, key)
def test_is_encryption_available_with_key(self, mock_master_key):
"""Test is_encryption_available returns True when key is configured."""
with patch('app.services.encryption_service.settings') as mock_settings:
mock_settings.ENCRYPTION_MASTER_KEY = mock_master_key
service = EncryptionService()
assert service.is_encryption_available() is True
def test_is_encryption_available_without_key(self):
"""Test is_encryption_available returns False when key is not configured."""
with patch('app.services.encryption_service.settings') as mock_settings:
mock_settings.ENCRYPTION_MASTER_KEY = None
service = EncryptionService()
assert service.is_encryption_available() is False
def test_master_key_not_configured_error(self):
"""Test that operations fail when master key is not configured."""
with patch('app.services.encryption_service.settings') as mock_settings:
mock_settings.ENCRYPTION_MASTER_KEY = None
service = EncryptionService()
key = secrets.token_bytes(32)
with pytest.raises(MasterKeyNotConfiguredError):
service.encrypt_key(key)
def test_encrypted_key_format(self, service_with_key):
"""Test that encrypted key is valid base64."""
key = service_with_key.generate_key()
encrypted_key = service_with_key.encrypt_key(key)
# Should be valid base64
decoded = base64.urlsafe_b64decode(encrypted_key)
# Should contain nonce + ciphertext + tag
assert len(decoded) >= NONCE_SIZE + KEY_SIZE + 16 # 16 = GCM tag size
class TestEncryptionServiceStreaming:
"""Tests for streaming encryption (for large files)."""
@pytest.fixture
def mock_master_key(self):
"""Generate a valid test master key."""
return base64.urlsafe_b64encode(secrets.token_bytes(32)).decode()
@pytest.fixture
def service_with_key(self, mock_master_key):
"""Create an encryption service with a mock master key."""
with patch('app.services.encryption_service.settings') as mock_settings:
mock_settings.ENCRYPTION_MASTER_KEY = mock_master_key
service = EncryptionService()
yield service
def test_streaming_encrypt_decrypt(self, service_with_key):
"""Test streaming encryption and decryption."""
# Create test content
original_content = b"Test content for streaming encryption. " * 1000
file_obj = BytesIO(original_content)
key = service_with_key.generate_key()
# Encrypt using streaming
encrypted_chunks = list(service_with_key.encrypt_file_streaming(file_obj, key))
encrypted_content = b''.join(encrypted_chunks)
# Decrypt using streaming
encrypted_file = BytesIO(encrypted_content)
decrypted_chunks = list(service_with_key.decrypt_file_streaming(encrypted_file, key))
decrypted_content = b''.join(decrypted_chunks)
assert decrypted_content == original_content
class TestEncryptionKeyValidation:
"""Tests for encryption key validation in config."""
def test_valid_master_key(self):
"""Test that a valid master key passes validation."""
from app.core.config import Settings
valid_key = base64.urlsafe_b64encode(secrets.token_bytes(32)).decode()
# This should not raise
with patch.dict('os.environ', {
'JWT_SECRET_KEY': 'test-secret-key-that-is-valid',
'ENCRYPTION_MASTER_KEY': valid_key
}):
settings = Settings()
assert settings.ENCRYPTION_MASTER_KEY == valid_key
def test_invalid_master_key_length(self):
"""Test that an invalid length master key fails validation."""
from app.core.config import Settings
# 16 bytes instead of 32
invalid_key = base64.urlsafe_b64encode(secrets.token_bytes(16)).decode()
with patch.dict('os.environ', {
'JWT_SECRET_KEY': 'test-secret-key-that-is-valid',
'ENCRYPTION_MASTER_KEY': invalid_key
}):
with pytest.raises(ValueError, match="must be a base64-encoded 32-byte key"):
Settings()
def test_invalid_master_key_format(self):
"""Test that an invalid format master key fails validation."""
from app.core.config import Settings
from pydantic import ValidationError
invalid_key = "not-valid-base64!@#$"
with patch.dict('os.environ', {
'JWT_SECRET_KEY': 'test-secret-key-that-is-valid',
'ENCRYPTION_MASTER_KEY': invalid_key
}):
with pytest.raises(ValidationError, match="ENCRYPTION_MASTER_KEY"):
Settings()
def test_empty_master_key_allowed(self):
"""Test that empty master key is allowed (encryption disabled)."""
from app.core.config import Settings
with patch.dict('os.environ', {
'JWT_SECRET_KEY': 'test-secret-key-that-is-valid',
'ENCRYPTION_MASTER_KEY': ''
}):
settings = Settings()
assert settings.ENCRYPTION_MASTER_KEY is None
class TestConfidentialProjectUpload:
"""Tests for file upload on confidential projects."""
@pytest.fixture
def test_user(self, db):
"""Create a test user."""
from app.models import User
import uuid as uuid_module
user = User(
id=str(uuid_module.uuid4()),
email="testuser_enc@example.com",
name="Test User Encryption",
role_id="00000000-0000-0000-0000-000000000003",
is_active=True,
is_system_admin=False,
)
db.add(user)
db.commit()
return user
@pytest.fixture
def test_user_token(self, client, mock_redis, test_user):
"""Get a token for test user."""
from app.core.security import create_access_token, create_token_payload
token_data = create_token_payload(
user_id=test_user.id,
email=test_user.email,
role="engineer",
department_id=None,
is_system_admin=False,
)
token = create_access_token(token_data)
mock_redis.setex(f"session:{test_user.id}", 900, token)
return token
@pytest.fixture
def test_space(self, db, test_user):
"""Create a test space."""
from app.models import Space
import uuid as uuid_module
space = Space(
id=str(uuid_module.uuid4()),
name="Test Space Encryption",
description="Test space for encryption tests",
owner_id=test_user.id,
)
db.add(space)
db.commit()
return space
@pytest.fixture
def confidential_project(self, db, test_space, test_user):
"""Create a confidential test project."""
from app.models import Project
import uuid as uuid_module
project = Project(
id=str(uuid_module.uuid4()),
space_id=test_space.id,
title="Confidential Project",
description="Test confidential project",
owner_id=test_user.id,
security_level="confidential",
)
db.add(project)
db.commit()
return project
@pytest.fixture
def test_task(self, db, confidential_project, test_user):
"""Create a test task in confidential project."""
from app.models import Task
import uuid as uuid_module
task = Task(
id=str(uuid_module.uuid4()),
project_id=confidential_project.id,
title="Test Task Encryption",
description="Test task for encryption tests",
created_by=test_user.id,
)
db.add(task)
db.commit()
return task
def test_upload_confidential_project_encryption_unavailable(
self, client, test_user_token, test_task, db
):
"""Test that uploading to confidential project returns 400 when encryption is unavailable."""
from io import BytesIO
# Mock encryption service to return False for is_encryption_available
with patch('app.api.attachments.router.encryption_service') as mock_enc_service:
mock_enc_service.is_encryption_available.return_value = False
content = b"Test file content"
files = {"file": ("test.pdf", BytesIO(content), "application/pdf")}
response = client.post(
f"/api/tasks/{test_task.id}/attachments",
headers={"Authorization": f"Bearer {test_user_token}"},
files=files,
)
assert response.status_code == 400
assert "ENCRYPTION_MASTER_KEY" in response.json()["detail"]
assert "environment variable" in response.json()["detail"]
def test_upload_confidential_project_no_active_key(
self, client, test_user_token, test_task, db
):
"""Test that uploading to confidential project returns 400 when no active encryption key exists."""
from io import BytesIO
from app.models import EncryptionKey
# Make sure no active encryption keys exist
db.query(EncryptionKey).filter(EncryptionKey.is_active == True).update(
{"is_active": False}
)
db.commit()
# Mock encryption service to return True for is_encryption_available
with patch('app.api.attachments.router.encryption_service') as mock_enc_service:
mock_enc_service.is_encryption_available.return_value = True
content = b"Test file content"
files = {"file": ("test.pdf", BytesIO(content), "application/pdf")}
response = client.post(
f"/api/tasks/{test_task.id}/attachments",
headers={"Authorization": f"Bearer {test_user_token}"},
files=files,
)
assert response.status_code == 400
assert "active encryption key" in response.json()["detail"]
assert "create" in response.json()["detail"].lower()