Security Validation (enhance-security-validation): - JWT secret validation with entropy checking and pattern detection - CSRF protection middleware with token generation/validation - Frontend CSRF token auto-injection for DELETE/PUT/PATCH requests - MIME type validation with magic bytes detection for file uploads Error Resilience (add-error-resilience): - React ErrorBoundary component with fallback UI and retry functionality - ErrorBoundaryWithI18n wrapper for internationalization support - Page-level and section-level error boundaries in App.tsx Query Performance (optimize-query-performance): - Query monitoring utility with threshold warnings - N+1 query fixes using joinedload/selectinload - Optimized project members, tasks, and subtasks endpoints Bug Fixes: - WebSocket session management (P0): Return primitives instead of ORM objects - LIKE query injection (P1): Escape special characters in search queries Tests: 543 backend tests, 56 frontend tests passing Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
403 lines
16 KiB
Python
403 lines
16 KiB
Python
"""
|
|
Tests for security validation features:
|
|
1. JWT secret validation (length and entropy)
|
|
2. CSRF protection
|
|
3. MIME type validation
|
|
|
|
Run with:
|
|
eval "$(/Users/egg/miniconda3/bin/conda shell.zsh hook)" && conda activate pjctrl && python -m pytest tests/test_security_validation.py -v
|
|
"""
|
|
|
|
import os
|
|
import pytest
|
|
import time
|
|
from unittest.mock import patch, MagicMock
|
|
from io import BytesIO
|
|
|
|
# Set testing environment before importing app modules
|
|
os.environ["TESTING"] = "true"
|
|
|
|
from fastapi import Request
|
|
from fastapi.testclient import TestClient
|
|
|
|
|
|
class TestJWTSecretValidation:
|
|
"""Tests for JWT secret validation functionality."""
|
|
|
|
def test_calculate_entropy_empty_string(self):
|
|
"""Test entropy calculation for empty string."""
|
|
from app.core.security import calculate_entropy
|
|
assert calculate_entropy("") == 0.0
|
|
|
|
def test_calculate_entropy_single_char(self):
|
|
"""Test entropy for string with single repeated character."""
|
|
from app.core.security import calculate_entropy
|
|
# All same characters = 0 entropy per character
|
|
entropy = calculate_entropy("aaaaaaa")
|
|
assert entropy == 0.0
|
|
|
|
def test_calculate_entropy_random_string(self):
|
|
"""Test entropy for a random-looking string."""
|
|
from app.core.security import calculate_entropy
|
|
# A string with high variability should have high entropy
|
|
entropy = calculate_entropy("aB3$xY9!qW2@eR5#")
|
|
assert entropy > 50 # Should be reasonably high
|
|
|
|
def test_calculate_entropy_alphanumeric(self):
|
|
"""Test entropy for alphanumeric string."""
|
|
from app.core.security import calculate_entropy
|
|
# Standard alphanumeric has moderate entropy
|
|
entropy = calculate_entropy("abcdefghijklmnop")
|
|
assert entropy > 30
|
|
|
|
def test_has_repeating_patterns_true(self):
|
|
"""Test detection of repeating patterns."""
|
|
from app.core.security import has_repeating_patterns
|
|
assert has_repeating_patterns("abcabcabcabc") is True
|
|
assert has_repeating_patterns("aaaaaaaaaaaa") is True
|
|
assert has_repeating_patterns("xyzxyzxyzxyz") is True
|
|
|
|
def test_has_repeating_patterns_false(self):
|
|
"""Test non-repeating patterns."""
|
|
from app.core.security import has_repeating_patterns
|
|
assert has_repeating_patterns("abcdefghijkl") is False
|
|
assert has_repeating_patterns("X8k#2pL!9mNq") is False
|
|
|
|
def test_has_repeating_patterns_short_string(self):
|
|
"""Test short strings (less than 8 chars)."""
|
|
from app.core.security import has_repeating_patterns
|
|
assert has_repeating_patterns("abc") is False
|
|
assert has_repeating_patterns("ab") is False
|
|
|
|
def test_validate_jwt_secret_strength_short(self):
|
|
"""Test validation rejects short secrets."""
|
|
from app.core.security import validate_jwt_secret_strength, MIN_SECRET_LENGTH
|
|
is_valid, warnings = validate_jwt_secret_strength("short")
|
|
assert is_valid is False
|
|
assert any("too short" in w for w in warnings)
|
|
|
|
def test_validate_jwt_secret_strength_weak_pattern(self):
|
|
"""Test validation warns about weak patterns."""
|
|
from app.core.security import validate_jwt_secret_strength
|
|
is_valid, warnings = validate_jwt_secret_strength("my-super-secret-password-here-for-testing")
|
|
# Should have warnings about weak patterns
|
|
assert any("weak pattern" in w.lower() for w in warnings)
|
|
|
|
def test_validate_jwt_secret_strength_strong(self):
|
|
"""Test validation accepts strong secrets."""
|
|
from app.core.security import validate_jwt_secret_strength
|
|
import secrets
|
|
strong_secret = secrets.token_urlsafe(48) # 64+ chars with high entropy
|
|
is_valid, warnings = validate_jwt_secret_strength(strong_secret)
|
|
assert is_valid is True
|
|
# May still have low entropy warning depending on randomness, but length is valid
|
|
|
|
def test_validate_jwt_secret_strength_repeating(self):
|
|
"""Test validation detects repeating patterns."""
|
|
from app.core.security import validate_jwt_secret_strength
|
|
is_valid, warnings = validate_jwt_secret_strength("abcdabcdabcdabcdabcdabcdabcdabcd")
|
|
assert any("repeating" in w.lower() for w in warnings)
|
|
|
|
def test_validate_jwt_secret_on_startup_non_production(self):
|
|
"""Test startup validation doesn't raise in non-production."""
|
|
from app.core.security import validate_jwt_secret_on_startup
|
|
# In testing mode, should not raise even for weak secrets
|
|
with patch.dict(os.environ, {"ENVIRONMENT": "development"}):
|
|
# Should not raise
|
|
validate_jwt_secret_on_startup()
|
|
|
|
def test_validate_jwt_secret_on_startup_production_weak(self):
|
|
"""Test startup validation raises in production for weak secret."""
|
|
from app.core.security import validate_jwt_secret_on_startup
|
|
from app.core.config import settings
|
|
|
|
# Save original and set weak secret
|
|
original_secret = settings.JWT_SECRET_KEY
|
|
|
|
try:
|
|
# Mock a weak secret
|
|
with patch.object(settings, 'JWT_SECRET_KEY', 'weak'):
|
|
with patch.dict(os.environ, {"ENVIRONMENT": "production"}):
|
|
with pytest.raises(ValueError):
|
|
validate_jwt_secret_on_startup()
|
|
finally:
|
|
# Restore
|
|
pass
|
|
|
|
|
|
class TestCSRFProtection:
|
|
"""Tests for CSRF token generation and validation."""
|
|
|
|
def test_generate_csrf_token(self):
|
|
"""Test CSRF token generation."""
|
|
from app.core.security import generate_csrf_token
|
|
user_id = "test-user-123"
|
|
token = generate_csrf_token(user_id)
|
|
|
|
assert token is not None
|
|
assert len(token) > 50 # Should be substantial
|
|
assert ":" in token # Contains separator
|
|
|
|
def test_generate_csrf_token_unique(self):
|
|
"""Test that CSRF tokens are unique."""
|
|
from app.core.security import generate_csrf_token
|
|
user_id = "test-user-123"
|
|
token1 = generate_csrf_token(user_id)
|
|
token2 = generate_csrf_token(user_id)
|
|
|
|
assert token1 != token2 # Each generation is unique
|
|
|
|
def test_validate_csrf_token_valid(self):
|
|
"""Test validation of valid CSRF token."""
|
|
from app.core.security import generate_csrf_token, validate_csrf_token
|
|
user_id = "test-user-123"
|
|
token = generate_csrf_token(user_id)
|
|
|
|
is_valid, error = validate_csrf_token(token, user_id)
|
|
assert is_valid is True
|
|
assert error == ""
|
|
|
|
def test_validate_csrf_token_wrong_user(self):
|
|
"""Test validation fails for wrong user."""
|
|
from app.core.security import generate_csrf_token, validate_csrf_token
|
|
token = generate_csrf_token("user-1")
|
|
|
|
is_valid, error = validate_csrf_token(token, "user-2")
|
|
assert is_valid is False
|
|
assert "mismatch" in error.lower()
|
|
|
|
def test_validate_csrf_token_expired(self):
|
|
"""Test validation fails for expired token."""
|
|
from app.core.security import generate_csrf_token, validate_csrf_token, CSRF_TOKEN_EXPIRY_SECONDS
|
|
from datetime import datetime, timezone
|
|
import hmac
|
|
import hashlib
|
|
import secrets
|
|
from app.core.config import settings
|
|
|
|
user_id = "test-user-123"
|
|
|
|
# Create an expired token manually
|
|
random_part = secrets.token_urlsafe(32)
|
|
expired_timestamp = int(datetime.now(timezone.utc).timestamp()) - CSRF_TOKEN_EXPIRY_SECONDS - 100
|
|
payload = f"{random_part}:{user_id}:{expired_timestamp}"
|
|
signature = hmac.new(
|
|
settings.JWT_SECRET_KEY.encode(),
|
|
payload.encode(),
|
|
hashlib.sha256
|
|
).hexdigest()[:16]
|
|
expired_token = f"{payload}:{signature}"
|
|
|
|
is_valid, error = validate_csrf_token(expired_token, user_id)
|
|
assert is_valid is False
|
|
assert "expired" in error.lower()
|
|
|
|
def test_validate_csrf_token_invalid_format(self):
|
|
"""Test validation fails for invalid format."""
|
|
from app.core.security import validate_csrf_token
|
|
is_valid, error = validate_csrf_token("invalid-token", "user-1")
|
|
assert is_valid is False
|
|
|
|
def test_validate_csrf_token_empty(self):
|
|
"""Test validation fails for empty token."""
|
|
from app.core.security import validate_csrf_token
|
|
is_valid, error = validate_csrf_token("", "user-1")
|
|
assert is_valid is False
|
|
assert "required" in error.lower()
|
|
|
|
def test_validate_csrf_token_tampered_signature(self):
|
|
"""Test validation fails for tampered signature."""
|
|
from app.core.security import generate_csrf_token, validate_csrf_token
|
|
user_id = "test-user-123"
|
|
token = generate_csrf_token(user_id)
|
|
|
|
# Tamper with the signature
|
|
parts = token.split(":")
|
|
parts[-1] = "tamperedsig123"
|
|
tampered_token = ":".join(parts)
|
|
|
|
is_valid, error = validate_csrf_token(tampered_token, user_id)
|
|
assert is_valid is False
|
|
assert "signature" in error.lower() or "invalid" in error.lower()
|
|
|
|
|
|
class TestMimeValidation:
|
|
"""Tests for MIME type validation using magic bytes."""
|
|
|
|
def test_detect_jpeg(self):
|
|
"""Test detection of JPEG files."""
|
|
from app.services.mime_validation_service import MimeValidationService
|
|
service = MimeValidationService()
|
|
|
|
# JPEG magic bytes
|
|
jpeg_content = b'\xFF\xD8\xFF\xE0' + b'\x00' * 100
|
|
mime = service.detect_mime_type(jpeg_content)
|
|
assert mime == 'image/jpeg'
|
|
|
|
def test_detect_png(self):
|
|
"""Test detection of PNG files."""
|
|
from app.services.mime_validation_service import MimeValidationService
|
|
service = MimeValidationService()
|
|
|
|
# PNG magic bytes
|
|
png_content = b'\x89PNG\r\n\x1a\n' + b'\x00' * 100
|
|
mime = service.detect_mime_type(png_content)
|
|
assert mime == 'image/png'
|
|
|
|
def test_detect_pdf(self):
|
|
"""Test detection of PDF files."""
|
|
from app.services.mime_validation_service import MimeValidationService
|
|
service = MimeValidationService()
|
|
|
|
# PDF magic bytes
|
|
pdf_content = b'%PDF-1.4' + b'\x00' * 100
|
|
mime = service.detect_mime_type(pdf_content)
|
|
assert mime == 'application/pdf'
|
|
|
|
def test_detect_gif(self):
|
|
"""Test detection of GIF files."""
|
|
from app.services.mime_validation_service import MimeValidationService
|
|
service = MimeValidationService()
|
|
|
|
# GIF87a magic bytes
|
|
gif_content = b'GIF87a' + b'\x00' * 100
|
|
mime = service.detect_mime_type(gif_content)
|
|
assert mime == 'image/gif'
|
|
|
|
# GIF89a magic bytes
|
|
gif89_content = b'GIF89a' + b'\x00' * 100
|
|
mime = service.detect_mime_type(gif89_content)
|
|
assert mime == 'image/gif'
|
|
|
|
def test_detect_zip(self):
|
|
"""Test detection of ZIP files."""
|
|
from app.services.mime_validation_service import MimeValidationService
|
|
service = MimeValidationService()
|
|
|
|
# ZIP magic bytes
|
|
zip_content = b'PK\x03\x04' + b'\x00' * 100
|
|
mime = service.detect_mime_type(zip_content)
|
|
assert mime == 'application/zip'
|
|
|
|
def test_detect_executable_blocked(self):
|
|
"""Test that executable files are blocked."""
|
|
from app.services.mime_validation_service import MimeValidationService
|
|
service = MimeValidationService()
|
|
|
|
# Windows executable magic bytes
|
|
exe_content = b'MZ' + b'\x00' * 100
|
|
is_valid, detected, error = service.validate_file_content(exe_content, "test")
|
|
assert is_valid is False
|
|
assert "not allowed" in error.lower() or "security" in error.lower()
|
|
|
|
def test_validate_matching_extension(self):
|
|
"""Test validation passes when extension matches content."""
|
|
from app.services.mime_validation_service import MimeValidationService
|
|
service = MimeValidationService()
|
|
|
|
jpeg_content = b'\xFF\xD8\xFF\xE0' + b'\x00' * 100
|
|
is_valid, detected, error = service.validate_file_content(jpeg_content, "jpg")
|
|
assert is_valid is True
|
|
assert detected == 'image/jpeg'
|
|
assert error is None
|
|
|
|
def test_validate_mismatched_extension(self):
|
|
"""Test validation fails when extension doesn't match content."""
|
|
from app.services.mime_validation_service import MimeValidationService
|
|
service = MimeValidationService()
|
|
|
|
# PNG content but .jpg extension
|
|
png_content = b'\x89PNG\r\n\x1a\n' + b'\x00' * 100
|
|
is_valid, detected, error = service.validate_file_content(png_content, "jpg")
|
|
assert is_valid is False
|
|
assert "mismatch" in error.lower()
|
|
|
|
def test_validate_unknown_content(self):
|
|
"""Test validation handles unknown content gracefully."""
|
|
from app.services.mime_validation_service import MimeValidationService
|
|
service = MimeValidationService()
|
|
|
|
# Random bytes with no known signature
|
|
unknown_content = b'\x00\x01\x02\x03\x04\x05' + b'\x00' * 100
|
|
is_valid, detected, error = service.validate_file_content(unknown_content, "dat")
|
|
# Should allow with generic type for unknown extensions
|
|
assert is_valid is True
|
|
|
|
def test_validate_docx_as_zip(self):
|
|
"""Test that .docx files (ZIP-based) are accepted."""
|
|
from app.services.mime_validation_service import MimeValidationService
|
|
service = MimeValidationService()
|
|
|
|
# DOCX is a ZIP container
|
|
docx_content = b'PK\x03\x04' + b'\x00' * 100
|
|
is_valid, detected, error = service.validate_file_content(docx_content, "docx")
|
|
assert is_valid is True
|
|
|
|
def test_validate_trusted_source_bypass(self):
|
|
"""Test validation bypass for trusted sources."""
|
|
from app.services.mime_validation_service import MimeValidationService
|
|
service = MimeValidationService(bypass_for_trusted=True)
|
|
|
|
# Even suspicious content should pass for trusted source
|
|
suspicious_content = b'MZ' + b'\x00' * 100
|
|
is_valid, detected, error = service.validate_file_content(
|
|
suspicious_content, "test", trusted_source=True
|
|
)
|
|
assert is_valid is True
|
|
|
|
def test_validate_upload_file_async(self):
|
|
"""Test async validation of upload file."""
|
|
import asyncio
|
|
from app.services.mime_validation_service import MimeValidationService
|
|
service = MimeValidationService()
|
|
|
|
async def test():
|
|
jpeg_content = b'\xFF\xD8\xFF\xE0' + b'\x00' * 100
|
|
is_valid, detected, error = await service.validate_upload_file(
|
|
jpeg_content, "photo.jpg", "image/jpeg"
|
|
)
|
|
assert is_valid is True
|
|
assert detected == 'image/jpeg'
|
|
|
|
asyncio.run(test())
|
|
|
|
def test_detect_webp(self):
|
|
"""Test detection of WebP files."""
|
|
from app.services.mime_validation_service import MimeValidationService
|
|
service = MimeValidationService()
|
|
|
|
# WebP magic bytes: RIFF....WEBP
|
|
webp_content = b'RIFF\x00\x00\x00\x00WEBP' + b'\x00' * 100
|
|
mime = service.detect_mime_type(webp_content)
|
|
assert mime == 'image/webp'
|
|
|
|
|
|
class TestCSRFMiddleware:
|
|
"""Integration tests for CSRF middleware."""
|
|
|
|
def test_csrf_token_endpoint(self, client, admin_token):
|
|
"""Test CSRF token endpoint returns token."""
|
|
response = client.get(
|
|
"/api/auth/csrf-token",
|
|
headers={"Authorization": f"Bearer {admin_token}"}
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "csrf_token" in data
|
|
assert "expires_in" in data
|
|
assert data["expires_in"] == 3600
|
|
|
|
def test_csrf_token_endpoint_v1(self, client, admin_token):
|
|
"""Test CSRF token endpoint on v1 namespace."""
|
|
response = client.get(
|
|
"/api/v1/auth/csrf-token",
|
|
headers={"Authorization": f"Bearer {admin_token}"}
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "csrf_token" in data
|
|
|
|
|
|
# Import fixtures from conftest
|
|
from tests.conftest import db, mock_redis, client, admin_token
|