Files
PROJECT-CONTORL/backend/app/services/encryption_service.py
beabigegg 2d80a8384e feat: implement custom fields, gantt view, calendar view, and file encryption
- Custom Fields (FEAT-001):
  - CustomField and TaskCustomValue models with formula support
  - CRUD API for custom field management
  - Formula engine for calculated fields
  - Frontend: CustomFieldEditor, CustomFieldInput, ProjectSettings page
  - Task list API now includes custom_values
  - KanbanBoard displays custom field values

- Gantt View (FEAT-003):
  - TaskDependency model with FS/SS/FF/SF dependency types
  - Dependency CRUD API with cycle detection
  - start_date field added to tasks
  - GanttChart component with Frappe Gantt integration
  - Dependency type selector in UI

- Calendar View (FEAT-004):
  - CalendarView component with FullCalendar integration
  - Date range filtering API for tasks
  - Drag-and-drop date updates
  - View mode switching in Tasks page

- File Encryption (FEAT-010):
  - AES-256-GCM encryption service
  - EncryptionKey model with key rotation support
  - Admin API for key management
  - Encrypted upload/download for confidential projects

- Migrations: 011 (custom fields), 012 (encryption keys), 013 (task dependencies)
- Updated issues.md with completion status

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-05 23:39:12 +08:00

301 lines
8.9 KiB
Python

"""
Encryption service for AES-256-GCM file encryption.
This service handles:
- File encryption key generation and management
- Encrypting/decrypting file encryption keys with Master Key
- Streaming file encryption/decryption with AES-256-GCM
"""
import os
import base64
import secrets
import logging
from typing import BinaryIO, Tuple, Optional, Generator
from io import BytesIO
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.backends import default_backend
from app.core.config import settings
logger = logging.getLogger(__name__)
# Constants
KEY_SIZE = 32 # 256 bits for AES-256
NONCE_SIZE = 12 # 96 bits for GCM recommended nonce size
TAG_SIZE = 16 # 128 bits for GCM authentication tag
CHUNK_SIZE = 64 * 1024 # 64KB chunks for streaming
class EncryptionError(Exception):
"""Base exception for encryption errors."""
pass
class MasterKeyNotConfiguredError(EncryptionError):
"""Raised when master key is not configured."""
pass
class DecryptionError(EncryptionError):
"""Raised when decryption fails."""
pass
class EncryptionService:
"""
Service for file encryption using AES-256-GCM.
Key hierarchy:
1. Master Key (from environment) -> encrypts file encryption keys
2. File Encryption Keys (stored in DB) -> encrypt actual files
"""
def __init__(self):
self._master_key: Optional[bytes] = None
@property
def master_key(self) -> bytes:
"""Get the master key, loading from config if needed."""
if self._master_key is None:
if not settings.ENCRYPTION_MASTER_KEY:
raise MasterKeyNotConfiguredError(
"ENCRYPTION_MASTER_KEY is not configured. "
"File encryption is disabled."
)
self._master_key = base64.urlsafe_b64decode(settings.ENCRYPTION_MASTER_KEY)
return self._master_key
def is_encryption_available(self) -> bool:
"""Check if encryption is available (master key configured)."""
return settings.ENCRYPTION_MASTER_KEY is not None
def generate_key(self) -> bytes:
"""
Generate a new AES-256 encryption key.
Returns:
32-byte random key
"""
return secrets.token_bytes(KEY_SIZE)
def encrypt_key(self, key: bytes) -> str:
"""
Encrypt a file encryption key using the Master Key.
Args:
key: The raw 32-byte file encryption key
Returns:
Base64-encoded encrypted key (nonce + ciphertext + tag)
"""
aesgcm = AESGCM(self.master_key)
nonce = secrets.token_bytes(NONCE_SIZE)
# Encrypt the key
ciphertext = aesgcm.encrypt(nonce, key, None)
# Combine nonce + ciphertext (includes tag)
encrypted_data = nonce + ciphertext
return base64.urlsafe_b64encode(encrypted_data).decode('utf-8')
def decrypt_key(self, encrypted_key: str) -> bytes:
"""
Decrypt a file encryption key using the Master Key.
Args:
encrypted_key: Base64-encoded encrypted key
Returns:
The raw 32-byte file encryption key
"""
try:
encrypted_data = base64.urlsafe_b64decode(encrypted_key)
# Extract nonce and ciphertext
nonce = encrypted_data[:NONCE_SIZE]
ciphertext = encrypted_data[NONCE_SIZE:]
# Decrypt
aesgcm = AESGCM(self.master_key)
return aesgcm.decrypt(nonce, ciphertext, None)
except Exception as e:
logger.error(f"Failed to decrypt encryption key: {e}")
raise DecryptionError("Failed to decrypt file encryption key")
def encrypt_file(self, file_content: BinaryIO, key: bytes) -> bytes:
"""
Encrypt file content using AES-256-GCM.
For smaller files, encrypts the entire content at once.
The format is: nonce (12 bytes) + ciphertext + tag (16 bytes)
Args:
file_content: File-like object to encrypt
key: 32-byte AES-256 key
Returns:
Encrypted bytes (nonce + ciphertext + tag)
"""
# Read all content
plaintext = file_content.read()
# Generate nonce
nonce = secrets.token_bytes(NONCE_SIZE)
# Encrypt
aesgcm = AESGCM(key)
ciphertext = aesgcm.encrypt(nonce, plaintext, None)
# Return nonce + ciphertext (tag is appended by encrypt)
return nonce + ciphertext
def decrypt_file(self, encrypted_content: BinaryIO, key: bytes) -> bytes:
"""
Decrypt file content using AES-256-GCM.
Args:
encrypted_content: File-like object containing encrypted data
key: 32-byte AES-256 key
Returns:
Decrypted bytes
"""
try:
# Read all encrypted content
encrypted_data = encrypted_content.read()
# Extract nonce and ciphertext
nonce = encrypted_data[:NONCE_SIZE]
ciphertext = encrypted_data[NONCE_SIZE:]
# Decrypt
aesgcm = AESGCM(key)
plaintext = aesgcm.decrypt(nonce, ciphertext, None)
return plaintext
except Exception as e:
logger.error(f"Failed to decrypt file: {e}")
raise DecryptionError("Failed to decrypt file. The file may be corrupted or the key is incorrect.")
def encrypt_file_streaming(self, file_content: BinaryIO, key: bytes) -> Generator[bytes, None, None]:
"""
Encrypt file content using AES-256-GCM with streaming.
For large files, encrypts in chunks. Each chunk has its own nonce.
Format per chunk: chunk_size (4 bytes) + nonce (12 bytes) + ciphertext + tag
Args:
file_content: File-like object to encrypt
key: 32-byte AES-256 key
Yields:
Encrypted chunks
"""
aesgcm = AESGCM(key)
# Write header with version byte
yield b'\x01' # Version 1 for streaming format
while True:
chunk = file_content.read(CHUNK_SIZE)
if not chunk:
break
# Generate nonce for this chunk
nonce = secrets.token_bytes(NONCE_SIZE)
# Encrypt chunk
ciphertext = aesgcm.encrypt(nonce, chunk, None)
# Write chunk size (4 bytes, little endian)
chunk_size = len(ciphertext) + NONCE_SIZE
yield chunk_size.to_bytes(4, 'little')
# Write nonce + ciphertext
yield nonce + ciphertext
# Write end marker (zero size)
yield b'\x00\x00\x00\x00'
def decrypt_file_streaming(self, encrypted_content: BinaryIO, key: bytes) -> Generator[bytes, None, None]:
"""
Decrypt file content using AES-256-GCM with streaming.
Args:
encrypted_content: File-like object containing encrypted data
key: 32-byte AES-256 key
Yields:
Decrypted chunks
"""
aesgcm = AESGCM(key)
# Read version byte
version = encrypted_content.read(1)
if version != b'\x01':
raise DecryptionError(f"Unknown encryption format version")
while True:
# Read chunk size
size_bytes = encrypted_content.read(4)
if len(size_bytes) < 4:
raise DecryptionError("Unexpected end of file")
chunk_size = int.from_bytes(size_bytes, 'little')
# Check for end marker
if chunk_size == 0:
break
# Read chunk (nonce + ciphertext)
chunk = encrypted_content.read(chunk_size)
if len(chunk) < chunk_size:
raise DecryptionError("Unexpected end of file")
# Extract nonce and ciphertext
nonce = chunk[:NONCE_SIZE]
ciphertext = chunk[NONCE_SIZE:]
try:
# Decrypt
plaintext = aesgcm.decrypt(nonce, ciphertext, None)
yield plaintext
except Exception as e:
raise DecryptionError(f"Failed to decrypt chunk: {e}")
def encrypt_bytes(self, data: bytes, key: bytes) -> bytes:
"""
Encrypt bytes directly (convenience method).
Args:
data: Bytes to encrypt
key: 32-byte AES-256 key
Returns:
Encrypted bytes
"""
return self.encrypt_file(BytesIO(data), key)
def decrypt_bytes(self, encrypted_data: bytes, key: bytes) -> bytes:
"""
Decrypt bytes directly (convenience method).
Args:
encrypted_data: Encrypted bytes
key: 32-byte AES-256 key
Returns:
Decrypted bytes
"""
return self.decrypt_file(BytesIO(encrypted_data), key)
# Singleton instance
encryption_service = EncryptionService()