- Backend (FastAPI): - AuditLog and AuditAlert models with Alembic migration - AuditService with SHA-256 checksum for log integrity - AuditMiddleware for request metadata extraction (IP, user_agent) - Integrated audit logging into Task, Project, Blocker APIs - Query API with filtering, pagination, CSV export - Integrity verification endpoint - Sensitive operation alerts with acknowledgement - Frontend (React + Vite): - Admin AuditPage with filters and export - ResourceHistory component for change tracking - Audit service for API calls - Testing: - 15 tests covering service and API endpoints - OpenSpec: - add-audit-trail change archived 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
44 lines
1.4 KiB
Python
44 lines
1.4 KiB
Python
from starlette.middleware.base import BaseHTTPMiddleware
|
|
from starlette.requests import Request
|
|
from typing import Optional
|
|
|
|
|
|
class AuditMiddleware(BaseHTTPMiddleware):
|
|
"""Middleware to extract audit metadata from requests."""
|
|
|
|
async def dispatch(self, request: Request, call_next):
|
|
# Extract metadata from request
|
|
request.state.audit_metadata = {
|
|
"ip_address": self.get_client_ip(request),
|
|
"user_agent": request.headers.get("user-agent", ""),
|
|
"method": request.method,
|
|
"path": str(request.url.path),
|
|
}
|
|
|
|
response = await call_next(request)
|
|
return response
|
|
|
|
@staticmethod
|
|
def get_client_ip(request: Request) -> str:
|
|
"""Get the real client IP address from request."""
|
|
# Check for forwarded headers (when behind a proxy)
|
|
forwarded = request.headers.get("x-forwarded-for")
|
|
if forwarded:
|
|
# Take the first IP in the chain (original client)
|
|
return forwarded.split(",")[0].strip()
|
|
|
|
real_ip = request.headers.get("x-real-ip")
|
|
if real_ip:
|
|
return real_ip
|
|
|
|
# Fallback to direct client
|
|
if request.client:
|
|
return request.client.host
|
|
|
|
return "unknown"
|
|
|
|
|
|
def get_audit_metadata(request: Request) -> Optional[dict]:
|
|
"""Get audit metadata from request state."""
|
|
return getattr(request.state, "audit_metadata", None)
|