""" Tests for the file encryption functionality. Tests cover: - Encryption service (key generation, encrypt/decrypt) - Encryption key management API - Attachment upload with encryption - Attachment download with decryption """ import pytest import base64 import secrets from io import BytesIO from unittest.mock import patch, MagicMock from app.services.encryption_service import ( EncryptionService, encryption_service, MasterKeyNotConfiguredError, DecryptionError, KEY_SIZE, NONCE_SIZE, ) class TestEncryptionService: """Tests for the encryption service.""" @pytest.fixture def mock_master_key(self): """Generate a valid test master key.""" return base64.urlsafe_b64encode(secrets.token_bytes(32)).decode() @pytest.fixture def service_with_key(self, mock_master_key): """Create an encryption service with a mock master key.""" with patch('app.services.encryption_service.settings') as mock_settings: mock_settings.ENCRYPTION_MASTER_KEY = mock_master_key service = EncryptionService() yield service def test_generate_key(self, service_with_key): """Test that generate_key produces a 32-byte key.""" key = service_with_key.generate_key() assert len(key) == KEY_SIZE assert isinstance(key, bytes) def test_generate_key_uniqueness(self, service_with_key): """Test that each generated key is unique.""" keys = [service_with_key.generate_key() for _ in range(10)] unique_keys = set(keys) assert len(unique_keys) == 10 def test_encrypt_decrypt_key(self, service_with_key): """Test encryption and decryption of a file encryption key.""" # Generate a key to encrypt original_key = service_with_key.generate_key() # Encrypt the key encrypted_key = service_with_key.encrypt_key(original_key) assert isinstance(encrypted_key, str) assert encrypted_key != base64.urlsafe_b64encode(original_key).decode() # Decrypt the key decrypted_key = service_with_key.decrypt_key(encrypted_key) assert decrypted_key == original_key def test_encrypt_decrypt_file(self, service_with_key): """Test file encryption and decryption.""" # Create test file content original_content = b"This is a test file content for encryption." file_obj = BytesIO(original_content) # Generate encryption key key = service_with_key.generate_key() # Encrypt encrypted_content = service_with_key.encrypt_file(file_obj, key) assert encrypted_content != original_content assert len(encrypted_content) > len(original_content) # Due to nonce and tag # Decrypt encrypted_file = BytesIO(encrypted_content) decrypted_content = service_with_key.decrypt_file(encrypted_file, key) assert decrypted_content == original_content def test_encrypt_decrypt_bytes(self, service_with_key): """Test bytes encryption and decryption convenience methods.""" original_data = b"Test data for encryption" key = service_with_key.generate_key() # Encrypt encrypted_data = service_with_key.encrypt_bytes(original_data, key) assert encrypted_data != original_data # Decrypt decrypted_data = service_with_key.decrypt_bytes(encrypted_data, key) assert decrypted_data == original_data def test_encrypt_large_file(self, service_with_key): """Test encryption of a larger file (1MB).""" # Create 1MB of random data original_content = secrets.token_bytes(1024 * 1024) file_obj = BytesIO(original_content) key = service_with_key.generate_key() # Encrypt encrypted_content = service_with_key.encrypt_file(file_obj, key) # Decrypt encrypted_file = BytesIO(encrypted_content) decrypted_content = service_with_key.decrypt_file(encrypted_file, key) assert decrypted_content == original_content def test_decrypt_with_wrong_key(self, service_with_key): """Test that decryption fails with wrong key.""" original_content = b"Secret content" file_obj = BytesIO(original_content) key1 = service_with_key.generate_key() key2 = service_with_key.generate_key() # Encrypt with key1 encrypted_content = service_with_key.encrypt_file(file_obj, key1) # Try to decrypt with key2 encrypted_file = BytesIO(encrypted_content) with pytest.raises(DecryptionError): service_with_key.decrypt_file(encrypted_file, key2) def test_decrypt_corrupted_data(self, service_with_key): """Test that decryption fails with corrupted data.""" original_content = b"Secret content" file_obj = BytesIO(original_content) key = service_with_key.generate_key() # Encrypt encrypted_content = service_with_key.encrypt_file(file_obj, key) # Corrupt the encrypted data corrupted = bytearray(encrypted_content) corrupted[20] ^= 0xFF # Flip some bits corrupted_content = bytes(corrupted) # Try to decrypt encrypted_file = BytesIO(corrupted_content) with pytest.raises(DecryptionError): service_with_key.decrypt_file(encrypted_file, key) def test_is_encryption_available_with_key(self, mock_master_key): """Test is_encryption_available returns True when key is configured.""" with patch('app.services.encryption_service.settings') as mock_settings: mock_settings.ENCRYPTION_MASTER_KEY = mock_master_key service = EncryptionService() assert service.is_encryption_available() is True def test_is_encryption_available_without_key(self): """Test is_encryption_available returns False when key is not configured.""" with patch('app.services.encryption_service.settings') as mock_settings: mock_settings.ENCRYPTION_MASTER_KEY = None service = EncryptionService() assert service.is_encryption_available() is False def test_master_key_not_configured_error(self): """Test that operations fail when master key is not configured.""" with patch('app.services.encryption_service.settings') as mock_settings: mock_settings.ENCRYPTION_MASTER_KEY = None service = EncryptionService() key = secrets.token_bytes(32) with pytest.raises(MasterKeyNotConfiguredError): service.encrypt_key(key) def test_encrypted_key_format(self, service_with_key): """Test that encrypted key is valid base64.""" key = service_with_key.generate_key() encrypted_key = service_with_key.encrypt_key(key) # Should be valid base64 decoded = base64.urlsafe_b64decode(encrypted_key) # Should contain nonce + ciphertext + tag assert len(decoded) >= NONCE_SIZE + KEY_SIZE + 16 # 16 = GCM tag size class TestEncryptionServiceStreaming: """Tests for streaming encryption (for large files).""" @pytest.fixture def mock_master_key(self): """Generate a valid test master key.""" return base64.urlsafe_b64encode(secrets.token_bytes(32)).decode() @pytest.fixture def service_with_key(self, mock_master_key): """Create an encryption service with a mock master key.""" with patch('app.services.encryption_service.settings') as mock_settings: mock_settings.ENCRYPTION_MASTER_KEY = mock_master_key service = EncryptionService() yield service def test_streaming_encrypt_decrypt(self, service_with_key): """Test streaming encryption and decryption.""" # Create test content original_content = b"Test content for streaming encryption. " * 1000 file_obj = BytesIO(original_content) key = service_with_key.generate_key() # Encrypt using streaming encrypted_chunks = list(service_with_key.encrypt_file_streaming(file_obj, key)) encrypted_content = b''.join(encrypted_chunks) # Decrypt using streaming encrypted_file = BytesIO(encrypted_content) decrypted_chunks = list(service_with_key.decrypt_file_streaming(encrypted_file, key)) decrypted_content = b''.join(decrypted_chunks) assert decrypted_content == original_content class TestEncryptionKeyValidation: """Tests for encryption key validation in config.""" def test_valid_master_key(self): """Test that a valid master key passes validation.""" from app.core.config import Settings valid_key = base64.urlsafe_b64encode(secrets.token_bytes(32)).decode() # This should not raise with patch.dict('os.environ', { 'JWT_SECRET_KEY': 'test-secret-key-that-is-valid', 'ENCRYPTION_MASTER_KEY': valid_key }): settings = Settings() assert settings.ENCRYPTION_MASTER_KEY == valid_key def test_invalid_master_key_length(self): """Test that an invalid length master key fails validation.""" from app.core.config import Settings # 16 bytes instead of 32 invalid_key = base64.urlsafe_b64encode(secrets.token_bytes(16)).decode() with patch.dict('os.environ', { 'JWT_SECRET_KEY': 'test-secret-key-that-is-valid', 'ENCRYPTION_MASTER_KEY': invalid_key }): with pytest.raises(ValueError, match="must be a base64-encoded 32-byte key"): Settings() def test_invalid_master_key_format(self): """Test that an invalid format master key fails validation.""" from app.core.config import Settings from pydantic import ValidationError invalid_key = "not-valid-base64!@#$" with patch.dict('os.environ', { 'JWT_SECRET_KEY': 'test-secret-key-that-is-valid', 'ENCRYPTION_MASTER_KEY': invalid_key }): with pytest.raises(ValidationError, match="ENCRYPTION_MASTER_KEY"): Settings() def test_empty_master_key_allowed(self): """Test that empty master key is allowed (encryption disabled).""" from app.core.config import Settings with patch.dict('os.environ', { 'JWT_SECRET_KEY': 'test-secret-key-that-is-valid', 'ENCRYPTION_MASTER_KEY': '' }): settings = Settings() assert settings.ENCRYPTION_MASTER_KEY is None class TestConfidentialProjectUpload: """Tests for file upload on confidential projects.""" @pytest.fixture def test_user(self, db): """Create a test user.""" from app.models import User import uuid as uuid_module user = User( id=str(uuid_module.uuid4()), email="testuser_enc@example.com", name="Test User Encryption", role_id="00000000-0000-0000-0000-000000000003", is_active=True, is_system_admin=False, ) db.add(user) db.commit() return user @pytest.fixture def test_user_token(self, client, mock_redis, test_user): """Get a token for test user.""" from app.core.security import create_access_token, create_token_payload token_data = create_token_payload( user_id=test_user.id, email=test_user.email, role="engineer", department_id=None, is_system_admin=False, ) token = create_access_token(token_data) mock_redis.setex(f"session:{test_user.id}", 900, token) return token @pytest.fixture def test_user_csrf_token(self, test_user): """Generate a CSRF token for the test user.""" from app.core.security import generate_csrf_token return generate_csrf_token(test_user.id) @pytest.fixture def test_space(self, db, test_user): """Create a test space.""" from app.models import Space import uuid as uuid_module space = Space( id=str(uuid_module.uuid4()), name="Test Space Encryption", description="Test space for encryption tests", owner_id=test_user.id, ) db.add(space) db.commit() return space @pytest.fixture def confidential_project(self, db, test_space, test_user): """Create a confidential test project.""" from app.models import Project import uuid as uuid_module project = Project( id=str(uuid_module.uuid4()), space_id=test_space.id, title="Confidential Project", description="Test confidential project", owner_id=test_user.id, security_level="confidential", ) db.add(project) db.commit() return project @pytest.fixture def test_task(self, db, confidential_project, test_user): """Create a test task in confidential project.""" from app.models import Task import uuid as uuid_module task = Task( id=str(uuid_module.uuid4()), project_id=confidential_project.id, title="Test Task Encryption", description="Test task for encryption tests", created_by=test_user.id, ) db.add(task) db.commit() return task def test_upload_confidential_project_encryption_unavailable( self, client, test_user_token, test_user_csrf_token, test_task, db ): """Test that uploading to confidential project returns 400 when encryption is unavailable.""" from io import BytesIO # Mock encryption service to return False for is_encryption_available with patch('app.api.attachments.router.encryption_service') as mock_enc_service: mock_enc_service.is_encryption_available.return_value = False content = b"Test file content" files = {"file": ("test.pdf", BytesIO(content), "application/pdf")} response = client.post( f"/api/tasks/{test_task.id}/attachments", headers={"Authorization": f"Bearer {test_user_token}", "X-CSRF-Token": test_user_csrf_token}, files=files, ) assert response.status_code == 400 assert "ENCRYPTION_MASTER_KEY" in response.json()["detail"] assert "environment variable" in response.json()["detail"] def test_upload_confidential_project_no_active_key( self, client, test_user_token, test_user_csrf_token, test_task, db ): """Test that uploading to confidential project returns 400 when no active encryption key exists.""" from io import BytesIO from app.models import EncryptionKey # Make sure no active encryption keys exist db.query(EncryptionKey).filter(EncryptionKey.is_active == True).update( {"is_active": False} ) db.commit() # Mock encryption service to return True for is_encryption_available with patch('app.api.attachments.router.encryption_service') as mock_enc_service: mock_enc_service.is_encryption_available.return_value = True content = b"Test file content" files = {"file": ("test.pdf", BytesIO(content), "application/pdf")} response = client.post( f"/api/tasks/{test_task.id}/attachments", headers={"Authorization": f"Bearer {test_user_token}", "X-CSRF-Token": test_user_csrf_token}, files=files, ) assert response.status_code == 400 assert "active encryption key" in response.json()["detail"] assert "create" in response.json()["detail"].lower()