Security Validation (enhance-security-validation): - JWT secret validation with entropy checking and pattern detection - CSRF protection middleware with token generation/validation - Frontend CSRF token auto-injection for DELETE/PUT/PATCH requests - MIME type validation with magic bytes detection for file uploads Error Resilience (add-error-resilience): - React ErrorBoundary component with fallback UI and retry functionality - ErrorBoundaryWithI18n wrapper for internationalization support - Page-level and section-level error boundaries in App.tsx Query Performance (optimize-query-performance): - Query monitoring utility with threshold warnings - N+1 query fixes using joinedload/selectinload - Optimized project members, tasks, and subtasks endpoints Bug Fixes: - WebSocket session management (P0): Return primitives instead of ORM objects - LIKE query injection (P1): Escape special characters in search queries Tests: 543 backend tests, 56 frontend tests passing Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
315 lines
12 KiB
Python
315 lines
12 KiB
Python
"""
|
|
MIME Type Validation Service using Magic Bytes Detection.
|
|
|
|
This module provides file content type validation by examining
|
|
the actual file content (magic bytes) rather than trusting
|
|
the file extension or Content-Type header.
|
|
"""
|
|
|
|
import logging
|
|
from typing import Optional, Tuple, Dict, Set, BinaryIO
|
|
from io import BytesIO
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class MimeValidationError(Exception):
|
|
"""Raised when MIME type validation fails."""
|
|
pass
|
|
|
|
|
|
class FileMismatchError(MimeValidationError):
|
|
"""Raised when file extension doesn't match actual content type."""
|
|
pass
|
|
|
|
|
|
class UnsupportedMimeError(MimeValidationError):
|
|
"""Raised when file has an unsupported MIME type."""
|
|
pass
|
|
|
|
|
|
# Magic bytes signatures for common file types
|
|
# Format: { bytes_pattern: (mime_type, extensions) }
|
|
MAGIC_SIGNATURES: Dict[bytes, Tuple[str, Set[str]]] = {
|
|
# Images
|
|
b'\xFF\xD8\xFF': ('image/jpeg', {'jpg', 'jpeg', 'jpe'}),
|
|
b'\x89PNG\r\n\x1a\n': ('image/png', {'png'}),
|
|
b'GIF87a': ('image/gif', {'gif'}),
|
|
b'GIF89a': ('image/gif', {'gif'}),
|
|
b'RIFF': ('image/webp', {'webp'}), # WebP starts with RIFF, then WEBP
|
|
b'BM': ('image/bmp', {'bmp'}),
|
|
|
|
# PDF
|
|
b'%PDF': ('application/pdf', {'pdf'}),
|
|
|
|
# Microsoft Office (Modern formats - ZIP-based)
|
|
b'PK\x03\x04': ('application/zip', {'zip', 'docx', 'xlsx', 'pptx', 'odt', 'ods', 'odp', 'jar'}),
|
|
|
|
# Microsoft Office (Legacy formats - Compound Document)
|
|
b'\xD0\xCF\x11\xE0\xA1\xB1\x1A\xE1': ('application/msword', {'doc', 'xls', 'ppt', 'msi'}),
|
|
|
|
# Archives
|
|
b'\x1f\x8b': ('application/gzip', {'gz', 'tgz'}),
|
|
b'\x42\x5a\x68': ('application/x-bzip2', {'bz2'}),
|
|
b'\x37\x7A\xBC\xAF\x27\x1C': ('application/x-7z-compressed', {'7z'}),
|
|
b'Rar!\x1a\x07': ('application/x-rar-compressed', {'rar'}),
|
|
|
|
# Text/Data formats - these are harder to detect, usually fallback to extension
|
|
b'<?xml': ('application/xml', {'xml', 'svg'}),
|
|
b'{': ('application/json', {'json'}), # JSON typically starts with { or [
|
|
b'[': ('application/json', {'json'}),
|
|
|
|
# Executables (dangerous - should be blocked)
|
|
b'MZ': ('application/x-executable', {'exe', 'dll', 'com', 'scr'}),
|
|
b'\x7fELF': ('application/x-executable', {'elf', 'so', 'bin'}),
|
|
}
|
|
|
|
# Map extensions to expected MIME types
|
|
EXTENSION_TO_MIME: Dict[str, Set[str]] = {
|
|
# Images
|
|
'jpg': {'image/jpeg'},
|
|
'jpeg': {'image/jpeg'},
|
|
'jpe': {'image/jpeg'},
|
|
'png': {'image/png'},
|
|
'gif': {'image/gif'},
|
|
'bmp': {'image/bmp'},
|
|
'webp': {'image/webp'},
|
|
'svg': {'image/svg+xml', 'application/xml', 'text/xml'},
|
|
|
|
# Documents
|
|
'pdf': {'application/pdf'},
|
|
'doc': {'application/msword'},
|
|
'docx': {'application/vnd.openxmlformats-officedocument.wordprocessingml.document', 'application/zip'},
|
|
'xls': {'application/vnd.ms-excel', 'application/msword'},
|
|
'xlsx': {'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', 'application/zip'},
|
|
'ppt': {'application/vnd.ms-powerpoint', 'application/msword'},
|
|
'pptx': {'application/vnd.openxmlformats-officedocument.presentationml.presentation', 'application/zip'},
|
|
|
|
# Text
|
|
'txt': {'text/plain'},
|
|
'csv': {'text/csv', 'text/plain'},
|
|
'json': {'application/json', 'text/plain'},
|
|
'xml': {'application/xml', 'text/xml', 'text/plain'},
|
|
'yaml': {'application/yaml', 'text/plain'},
|
|
'yml': {'application/yaml', 'text/plain'},
|
|
|
|
# Archives
|
|
'zip': {'application/zip'},
|
|
'rar': {'application/x-rar-compressed'},
|
|
'7z': {'application/x-7z-compressed'},
|
|
'tar': {'application/x-tar'},
|
|
'gz': {'application/gzip'},
|
|
}
|
|
|
|
# MIME types that should always be blocked (dangerous executables)
|
|
BLOCKED_MIME_TYPES: Set[str] = {
|
|
'application/x-executable',
|
|
'application/x-msdownload',
|
|
'application/x-msdos-program',
|
|
'application/x-sh',
|
|
'application/x-csh',
|
|
'application/x-dosexec',
|
|
}
|
|
|
|
# Configurable allowed MIME type categories
|
|
ALLOWED_MIME_CATEGORIES: Dict[str, Set[str]] = {
|
|
'images': {
|
|
'image/jpeg', 'image/png', 'image/gif', 'image/bmp', 'image/webp', 'image/svg+xml'
|
|
},
|
|
'documents': {
|
|
'application/pdf',
|
|
'application/msword',
|
|
'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
|
|
'application/vnd.ms-excel',
|
|
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
|
|
'application/vnd.ms-powerpoint',
|
|
'application/vnd.openxmlformats-officedocument.presentationml.presentation',
|
|
'text/plain', 'text/csv',
|
|
},
|
|
'archives': {
|
|
'application/zip', 'application/x-rar-compressed',
|
|
'application/x-7z-compressed', 'application/gzip',
|
|
'application/x-tar',
|
|
},
|
|
'data': {
|
|
'application/json', 'application/xml', 'text/xml',
|
|
'application/yaml', 'text/plain',
|
|
},
|
|
}
|
|
|
|
|
|
class MimeValidationService:
|
|
"""Service for validating file MIME types using magic bytes."""
|
|
|
|
def __init__(
|
|
self,
|
|
allowed_categories: Optional[Set[str]] = None,
|
|
bypass_for_trusted: bool = False
|
|
):
|
|
"""
|
|
Initialize the MIME validation service.
|
|
|
|
Args:
|
|
allowed_categories: Set of allowed MIME categories ('images', 'documents', etc.)
|
|
If None, all categories are allowed.
|
|
bypass_for_trusted: If True, validation can be bypassed for trusted sources.
|
|
"""
|
|
self.bypass_for_trusted = bypass_for_trusted
|
|
|
|
# Build set of allowed MIME types
|
|
if allowed_categories is None:
|
|
self.allowed_mime_types = set()
|
|
for category_mimes in ALLOWED_MIME_CATEGORIES.values():
|
|
self.allowed_mime_types.update(category_mimes)
|
|
else:
|
|
self.allowed_mime_types = set()
|
|
for category in allowed_categories:
|
|
if category in ALLOWED_MIME_CATEGORIES:
|
|
self.allowed_mime_types.update(ALLOWED_MIME_CATEGORIES[category])
|
|
|
|
def detect_mime_type(self, file_content: bytes) -> Optional[str]:
|
|
"""
|
|
Detect MIME type from file content using magic bytes.
|
|
|
|
Args:
|
|
file_content: The raw file bytes (at least first 16 bytes needed)
|
|
|
|
Returns:
|
|
Detected MIME type or None if unknown
|
|
"""
|
|
if len(file_content) < 2:
|
|
return None
|
|
|
|
# Check each magic signature
|
|
for magic_bytes, (mime_type, _) in MAGIC_SIGNATURES.items():
|
|
if file_content.startswith(magic_bytes):
|
|
# Special case for WebP: check for WEBP after RIFF
|
|
if magic_bytes == b'RIFF' and len(file_content) >= 12:
|
|
if file_content[8:12] == b'WEBP':
|
|
return 'image/webp'
|
|
else:
|
|
continue # Not WebP, might be something else
|
|
|
|
return mime_type
|
|
|
|
return None
|
|
|
|
def validate_file_content(
|
|
self,
|
|
file_content: bytes,
|
|
declared_extension: str,
|
|
declared_mime_type: Optional[str] = None,
|
|
trusted_source: bool = False
|
|
) -> Tuple[bool, str, Optional[str]]:
|
|
"""
|
|
Validate file content against declared extension and MIME type.
|
|
|
|
Args:
|
|
file_content: The raw file bytes
|
|
declared_extension: The file extension (without dot)
|
|
declared_mime_type: The Content-Type header value (optional)
|
|
trusted_source: If True and bypass_for_trusted is enabled, skip validation
|
|
|
|
Returns:
|
|
Tuple of (is_valid, detected_mime_type, error_message)
|
|
"""
|
|
# Bypass for trusted sources if configured
|
|
if trusted_source and self.bypass_for_trusted:
|
|
logger.debug("MIME validation bypassed for trusted source")
|
|
return True, declared_mime_type or 'application/octet-stream', None
|
|
|
|
# Detect actual MIME type
|
|
detected_mime = self.detect_mime_type(file_content)
|
|
ext_lower = declared_extension.lower()
|
|
|
|
# Check if detected MIME is blocked (dangerous executable)
|
|
if detected_mime in BLOCKED_MIME_TYPES:
|
|
logger.warning(
|
|
"Blocked dangerous file type detected: %s (claimed extension: %s)",
|
|
detected_mime, ext_lower
|
|
)
|
|
return False, detected_mime, "File type not allowed for security reasons"
|
|
|
|
# If we couldn't detect the MIME type, fall back to extension-based check
|
|
if detected_mime is None:
|
|
# For text/data files, detection is unreliable
|
|
# Trust the extension if it's in our allowed list
|
|
if ext_lower in EXTENSION_TO_MIME:
|
|
expected_mimes = EXTENSION_TO_MIME[ext_lower]
|
|
# Check if any expected MIME is in allowed set
|
|
if expected_mimes & self.allowed_mime_types:
|
|
logger.debug(
|
|
"MIME detection inconclusive for extension %s, allowing based on extension",
|
|
ext_lower
|
|
)
|
|
# Return the first expected MIME type
|
|
return True, next(iter(expected_mimes)), None
|
|
|
|
# Unknown extension or MIME type
|
|
logger.warning(
|
|
"Could not detect MIME type for file with extension: %s",
|
|
ext_lower
|
|
)
|
|
return True, 'application/octet-stream', None
|
|
|
|
# Check if detected MIME is in allowed set
|
|
if detected_mime not in self.allowed_mime_types:
|
|
logger.warning(
|
|
"Unsupported MIME type detected: %s (extension: %s)",
|
|
detected_mime, ext_lower
|
|
)
|
|
return False, detected_mime, f"Unsupported file type: {detected_mime}"
|
|
|
|
# Verify extension matches detected MIME type
|
|
if ext_lower in EXTENSION_TO_MIME:
|
|
expected_mimes = EXTENSION_TO_MIME[ext_lower]
|
|
|
|
# Special handling for ZIP-based formats (docx, xlsx, pptx)
|
|
if detected_mime == 'application/zip' and ext_lower in {'docx', 'xlsx', 'pptx', 'odt', 'ods', 'odp'}:
|
|
# These are valid - ZIP container with specific extension
|
|
return True, detected_mime, None
|
|
|
|
# Check if detected MIME matches any expected MIME for this extension
|
|
if detected_mime not in expected_mimes:
|
|
# Mismatch detected!
|
|
logger.warning(
|
|
"File type mismatch: extension '%s' but detected '%s'",
|
|
ext_lower, detected_mime
|
|
)
|
|
return False, detected_mime, f"File type mismatch: extension indicates {ext_lower} but content is {detected_mime}"
|
|
|
|
return True, detected_mime, None
|
|
|
|
async def validate_upload_file(
|
|
self,
|
|
file_content: bytes,
|
|
filename: str,
|
|
content_type: Optional[str] = None,
|
|
trusted_source: bool = False
|
|
) -> Tuple[bool, str, Optional[str]]:
|
|
"""
|
|
Validate an uploaded file.
|
|
|
|
Args:
|
|
file_content: The raw file bytes
|
|
filename: The uploaded filename
|
|
content_type: The Content-Type header value
|
|
trusted_source: If True and bypass is enabled, skip validation
|
|
|
|
Returns:
|
|
Tuple of (is_valid, detected_mime_type, error_message)
|
|
"""
|
|
# Extract extension
|
|
extension = filename.rsplit('.', 1)[-1] if '.' in filename else ''
|
|
|
|
return self.validate_file_content(
|
|
file_content=file_content,
|
|
declared_extension=extension,
|
|
declared_mime_type=content_type,
|
|
trusted_source=trusted_source
|
|
)
|
|
|
|
|
|
# Singleton instance with default configuration
|
|
mime_validation_service = MimeValidationService()
|