Files
PROJECT-CONTORL/backend/tests/test_security_validation.py
beabigegg 679b89ae4c feat: implement security, error resilience, and query optimization proposals
Security Validation (enhance-security-validation):
- JWT secret validation with entropy checking and pattern detection
- CSRF protection middleware with token generation/validation
- Frontend CSRF token auto-injection for DELETE/PUT/PATCH requests
- MIME type validation with magic bytes detection for file uploads

Error Resilience (add-error-resilience):
- React ErrorBoundary component with fallback UI and retry functionality
- ErrorBoundaryWithI18n wrapper for internationalization support
- Page-level and section-level error boundaries in App.tsx

Query Performance (optimize-query-performance):
- Query monitoring utility with threshold warnings
- N+1 query fixes using joinedload/selectinload
- Optimized project members, tasks, and subtasks endpoints

Bug Fixes:
- WebSocket session management (P0): Return primitives instead of ORM objects
- LIKE query injection (P1): Escape special characters in search queries

Tests: 543 backend tests, 56 frontend tests passing

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-11 18:41:19 +08:00

403 lines
16 KiB
Python

"""
Tests for security validation features:
1. JWT secret validation (length and entropy)
2. CSRF protection
3. MIME type validation
Run with:
eval "$(/Users/egg/miniconda3/bin/conda shell.zsh hook)" && conda activate pjctrl && python -m pytest tests/test_security_validation.py -v
"""
import os
import pytest
import time
from unittest.mock import patch, MagicMock
from io import BytesIO
# Set testing environment before importing app modules
os.environ["TESTING"] = "true"
from fastapi import Request
from fastapi.testclient import TestClient
class TestJWTSecretValidation:
"""Tests for JWT secret validation functionality."""
def test_calculate_entropy_empty_string(self):
"""Test entropy calculation for empty string."""
from app.core.security import calculate_entropy
assert calculate_entropy("") == 0.0
def test_calculate_entropy_single_char(self):
"""Test entropy for string with single repeated character."""
from app.core.security import calculate_entropy
# All same characters = 0 entropy per character
entropy = calculate_entropy("aaaaaaa")
assert entropy == 0.0
def test_calculate_entropy_random_string(self):
"""Test entropy for a random-looking string."""
from app.core.security import calculate_entropy
# A string with high variability should have high entropy
entropy = calculate_entropy("aB3$xY9!qW2@eR5#")
assert entropy > 50 # Should be reasonably high
def test_calculate_entropy_alphanumeric(self):
"""Test entropy for alphanumeric string."""
from app.core.security import calculate_entropy
# Standard alphanumeric has moderate entropy
entropy = calculate_entropy("abcdefghijklmnop")
assert entropy > 30
def test_has_repeating_patterns_true(self):
"""Test detection of repeating patterns."""
from app.core.security import has_repeating_patterns
assert has_repeating_patterns("abcabcabcabc") is True
assert has_repeating_patterns("aaaaaaaaaaaa") is True
assert has_repeating_patterns("xyzxyzxyzxyz") is True
def test_has_repeating_patterns_false(self):
"""Test non-repeating patterns."""
from app.core.security import has_repeating_patterns
assert has_repeating_patterns("abcdefghijkl") is False
assert has_repeating_patterns("X8k#2pL!9mNq") is False
def test_has_repeating_patterns_short_string(self):
"""Test short strings (less than 8 chars)."""
from app.core.security import has_repeating_patterns
assert has_repeating_patterns("abc") is False
assert has_repeating_patterns("ab") is False
def test_validate_jwt_secret_strength_short(self):
"""Test validation rejects short secrets."""
from app.core.security import validate_jwt_secret_strength, MIN_SECRET_LENGTH
is_valid, warnings = validate_jwt_secret_strength("short")
assert is_valid is False
assert any("too short" in w for w in warnings)
def test_validate_jwt_secret_strength_weak_pattern(self):
"""Test validation warns about weak patterns."""
from app.core.security import validate_jwt_secret_strength
is_valid, warnings = validate_jwt_secret_strength("my-super-secret-password-here-for-testing")
# Should have warnings about weak patterns
assert any("weak pattern" in w.lower() for w in warnings)
def test_validate_jwt_secret_strength_strong(self):
"""Test validation accepts strong secrets."""
from app.core.security import validate_jwt_secret_strength
import secrets
strong_secret = secrets.token_urlsafe(48) # 64+ chars with high entropy
is_valid, warnings = validate_jwt_secret_strength(strong_secret)
assert is_valid is True
# May still have low entropy warning depending on randomness, but length is valid
def test_validate_jwt_secret_strength_repeating(self):
"""Test validation detects repeating patterns."""
from app.core.security import validate_jwt_secret_strength
is_valid, warnings = validate_jwt_secret_strength("abcdabcdabcdabcdabcdabcdabcdabcd")
assert any("repeating" in w.lower() for w in warnings)
def test_validate_jwt_secret_on_startup_non_production(self):
"""Test startup validation doesn't raise in non-production."""
from app.core.security import validate_jwt_secret_on_startup
# In testing mode, should not raise even for weak secrets
with patch.dict(os.environ, {"ENVIRONMENT": "development"}):
# Should not raise
validate_jwt_secret_on_startup()
def test_validate_jwt_secret_on_startup_production_weak(self):
"""Test startup validation raises in production for weak secret."""
from app.core.security import validate_jwt_secret_on_startup
from app.core.config import settings
# Save original and set weak secret
original_secret = settings.JWT_SECRET_KEY
try:
# Mock a weak secret
with patch.object(settings, 'JWT_SECRET_KEY', 'weak'):
with patch.dict(os.environ, {"ENVIRONMENT": "production"}):
with pytest.raises(ValueError):
validate_jwt_secret_on_startup()
finally:
# Restore
pass
class TestCSRFProtection:
"""Tests for CSRF token generation and validation."""
def test_generate_csrf_token(self):
"""Test CSRF token generation."""
from app.core.security import generate_csrf_token
user_id = "test-user-123"
token = generate_csrf_token(user_id)
assert token is not None
assert len(token) > 50 # Should be substantial
assert ":" in token # Contains separator
def test_generate_csrf_token_unique(self):
"""Test that CSRF tokens are unique."""
from app.core.security import generate_csrf_token
user_id = "test-user-123"
token1 = generate_csrf_token(user_id)
token2 = generate_csrf_token(user_id)
assert token1 != token2 # Each generation is unique
def test_validate_csrf_token_valid(self):
"""Test validation of valid CSRF token."""
from app.core.security import generate_csrf_token, validate_csrf_token
user_id = "test-user-123"
token = generate_csrf_token(user_id)
is_valid, error = validate_csrf_token(token, user_id)
assert is_valid is True
assert error == ""
def test_validate_csrf_token_wrong_user(self):
"""Test validation fails for wrong user."""
from app.core.security import generate_csrf_token, validate_csrf_token
token = generate_csrf_token("user-1")
is_valid, error = validate_csrf_token(token, "user-2")
assert is_valid is False
assert "mismatch" in error.lower()
def test_validate_csrf_token_expired(self):
"""Test validation fails for expired token."""
from app.core.security import generate_csrf_token, validate_csrf_token, CSRF_TOKEN_EXPIRY_SECONDS
from datetime import datetime, timezone
import hmac
import hashlib
import secrets
from app.core.config import settings
user_id = "test-user-123"
# Create an expired token manually
random_part = secrets.token_urlsafe(32)
expired_timestamp = int(datetime.now(timezone.utc).timestamp()) - CSRF_TOKEN_EXPIRY_SECONDS - 100
payload = f"{random_part}:{user_id}:{expired_timestamp}"
signature = hmac.new(
settings.JWT_SECRET_KEY.encode(),
payload.encode(),
hashlib.sha256
).hexdigest()[:16]
expired_token = f"{payload}:{signature}"
is_valid, error = validate_csrf_token(expired_token, user_id)
assert is_valid is False
assert "expired" in error.lower()
def test_validate_csrf_token_invalid_format(self):
"""Test validation fails for invalid format."""
from app.core.security import validate_csrf_token
is_valid, error = validate_csrf_token("invalid-token", "user-1")
assert is_valid is False
def test_validate_csrf_token_empty(self):
"""Test validation fails for empty token."""
from app.core.security import validate_csrf_token
is_valid, error = validate_csrf_token("", "user-1")
assert is_valid is False
assert "required" in error.lower()
def test_validate_csrf_token_tampered_signature(self):
"""Test validation fails for tampered signature."""
from app.core.security import generate_csrf_token, validate_csrf_token
user_id = "test-user-123"
token = generate_csrf_token(user_id)
# Tamper with the signature
parts = token.split(":")
parts[-1] = "tamperedsig123"
tampered_token = ":".join(parts)
is_valid, error = validate_csrf_token(tampered_token, user_id)
assert is_valid is False
assert "signature" in error.lower() or "invalid" in error.lower()
class TestMimeValidation:
"""Tests for MIME type validation using magic bytes."""
def test_detect_jpeg(self):
"""Test detection of JPEG files."""
from app.services.mime_validation_service import MimeValidationService
service = MimeValidationService()
# JPEG magic bytes
jpeg_content = b'\xFF\xD8\xFF\xE0' + b'\x00' * 100
mime = service.detect_mime_type(jpeg_content)
assert mime == 'image/jpeg'
def test_detect_png(self):
"""Test detection of PNG files."""
from app.services.mime_validation_service import MimeValidationService
service = MimeValidationService()
# PNG magic bytes
png_content = b'\x89PNG\r\n\x1a\n' + b'\x00' * 100
mime = service.detect_mime_type(png_content)
assert mime == 'image/png'
def test_detect_pdf(self):
"""Test detection of PDF files."""
from app.services.mime_validation_service import MimeValidationService
service = MimeValidationService()
# PDF magic bytes
pdf_content = b'%PDF-1.4' + b'\x00' * 100
mime = service.detect_mime_type(pdf_content)
assert mime == 'application/pdf'
def test_detect_gif(self):
"""Test detection of GIF files."""
from app.services.mime_validation_service import MimeValidationService
service = MimeValidationService()
# GIF87a magic bytes
gif_content = b'GIF87a' + b'\x00' * 100
mime = service.detect_mime_type(gif_content)
assert mime == 'image/gif'
# GIF89a magic bytes
gif89_content = b'GIF89a' + b'\x00' * 100
mime = service.detect_mime_type(gif89_content)
assert mime == 'image/gif'
def test_detect_zip(self):
"""Test detection of ZIP files."""
from app.services.mime_validation_service import MimeValidationService
service = MimeValidationService()
# ZIP magic bytes
zip_content = b'PK\x03\x04' + b'\x00' * 100
mime = service.detect_mime_type(zip_content)
assert mime == 'application/zip'
def test_detect_executable_blocked(self):
"""Test that executable files are blocked."""
from app.services.mime_validation_service import MimeValidationService
service = MimeValidationService()
# Windows executable magic bytes
exe_content = b'MZ' + b'\x00' * 100
is_valid, detected, error = service.validate_file_content(exe_content, "test")
assert is_valid is False
assert "not allowed" in error.lower() or "security" in error.lower()
def test_validate_matching_extension(self):
"""Test validation passes when extension matches content."""
from app.services.mime_validation_service import MimeValidationService
service = MimeValidationService()
jpeg_content = b'\xFF\xD8\xFF\xE0' + b'\x00' * 100
is_valid, detected, error = service.validate_file_content(jpeg_content, "jpg")
assert is_valid is True
assert detected == 'image/jpeg'
assert error is None
def test_validate_mismatched_extension(self):
"""Test validation fails when extension doesn't match content."""
from app.services.mime_validation_service import MimeValidationService
service = MimeValidationService()
# PNG content but .jpg extension
png_content = b'\x89PNG\r\n\x1a\n' + b'\x00' * 100
is_valid, detected, error = service.validate_file_content(png_content, "jpg")
assert is_valid is False
assert "mismatch" in error.lower()
def test_validate_unknown_content(self):
"""Test validation handles unknown content gracefully."""
from app.services.mime_validation_service import MimeValidationService
service = MimeValidationService()
# Random bytes with no known signature
unknown_content = b'\x00\x01\x02\x03\x04\x05' + b'\x00' * 100
is_valid, detected, error = service.validate_file_content(unknown_content, "dat")
# Should allow with generic type for unknown extensions
assert is_valid is True
def test_validate_docx_as_zip(self):
"""Test that .docx files (ZIP-based) are accepted."""
from app.services.mime_validation_service import MimeValidationService
service = MimeValidationService()
# DOCX is a ZIP container
docx_content = b'PK\x03\x04' + b'\x00' * 100
is_valid, detected, error = service.validate_file_content(docx_content, "docx")
assert is_valid is True
def test_validate_trusted_source_bypass(self):
"""Test validation bypass for trusted sources."""
from app.services.mime_validation_service import MimeValidationService
service = MimeValidationService(bypass_for_trusted=True)
# Even suspicious content should pass for trusted source
suspicious_content = b'MZ' + b'\x00' * 100
is_valid, detected, error = service.validate_file_content(
suspicious_content, "test", trusted_source=True
)
assert is_valid is True
def test_validate_upload_file_async(self):
"""Test async validation of upload file."""
import asyncio
from app.services.mime_validation_service import MimeValidationService
service = MimeValidationService()
async def test():
jpeg_content = b'\xFF\xD8\xFF\xE0' + b'\x00' * 100
is_valid, detected, error = await service.validate_upload_file(
jpeg_content, "photo.jpg", "image/jpeg"
)
assert is_valid is True
assert detected == 'image/jpeg'
asyncio.run(test())
def test_detect_webp(self):
"""Test detection of WebP files."""
from app.services.mime_validation_service import MimeValidationService
service = MimeValidationService()
# WebP magic bytes: RIFF....WEBP
webp_content = b'RIFF\x00\x00\x00\x00WEBP' + b'\x00' * 100
mime = service.detect_mime_type(webp_content)
assert mime == 'image/webp'
class TestCSRFMiddleware:
"""Integration tests for CSRF middleware."""
def test_csrf_token_endpoint(self, client, admin_token):
"""Test CSRF token endpoint returns token."""
response = client.get(
"/api/auth/csrf-token",
headers={"Authorization": f"Bearer {admin_token}"}
)
assert response.status_code == 200
data = response.json()
assert "csrf_token" in data
assert "expires_in" in data
assert data["expires_in"] == 3600
def test_csrf_token_endpoint_v1(self, client, admin_token):
"""Test CSRF token endpoint on v1 namespace."""
response = client.get(
"/api/v1/auth/csrf-token",
headers={"Authorization": f"Bearer {admin_token}"}
)
assert response.status_code == 200
data = response.json()
assert "csrf_token" in data
# Import fixtures from conftest
from tests.conftest import db, mock_redis, client, admin_token