Files
PROJECT-CONTORL/backend/app/services/audit_service.py
beabigegg 0ef78e13ff feat: implement audit trail module
- Backend (FastAPI):
  - AuditLog and AuditAlert models with Alembic migration
  - AuditService with SHA-256 checksum for log integrity
  - AuditMiddleware for request metadata extraction (IP, user_agent)
  - Integrated audit logging into Task, Project, Blocker APIs
  - Query API with filtering, pagination, CSV export
  - Integrity verification endpoint
  - Sensitive operation alerts with acknowledgement

- Frontend (React + Vite):
  - Admin AuditPage with filters and export
  - ResourceHistory component for change tracking
  - Audit service for API calls

- Testing:
  - 15 tests covering service and API endpoints

- OpenSpec:
  - add-audit-trail change archived

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-29 21:21:18 +08:00

211 lines
6.6 KiB
Python

import uuid
import hashlib
import json
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, List
from sqlalchemy.orm import Session
from app.models import (
AuditLog, AuditAlert, AuditAction, SensitivityLevel,
EVENT_SENSITIVITY, ALERT_EVENTS, User
)
class AuditService:
"""Service for managing audit logs and alerts."""
# Bulk delete threshold: more than 5 deletes in 5 minutes
BULK_DELETE_THRESHOLD = 5
BULK_DELETE_WINDOW_MINUTES = 5
@staticmethod
def calculate_checksum(
event_type: str,
resource_id: Optional[str],
user_id: Optional[str],
changes: Optional[Dict],
created_at: datetime
) -> str:
"""Calculate SHA-256 checksum for audit log integrity."""
changes_json = json.dumps(changes, sort_keys=True) if changes else ""
content = f"{event_type}|{resource_id or ''}|{user_id or ''}|{changes_json}|{created_at.isoformat()}"
return hashlib.sha256(content.encode()).hexdigest()
@staticmethod
def verify_checksum(log: AuditLog) -> bool:
"""Verify that an audit log's checksum is valid."""
expected = AuditService.calculate_checksum(
log.event_type,
log.resource_id,
log.user_id,
log.changes,
log.created_at
)
return log.checksum == expected
@staticmethod
def get_sensitivity_level(event_type: str) -> SensitivityLevel:
"""Get sensitivity level for an event type."""
return EVENT_SENSITIVITY.get(event_type, SensitivityLevel.LOW)
@staticmethod
def detect_changes(old_values: Dict[str, Any], new_values: Dict[str, Any]) -> List[Dict]:
"""Detect changes between old and new values."""
changes = []
all_keys = set(old_values.keys()) | set(new_values.keys())
for key in all_keys:
old_val = old_values.get(key)
new_val = new_values.get(key)
# Convert datetime to string for comparison
if isinstance(old_val, datetime):
old_val = old_val.isoformat()
if isinstance(new_val, datetime):
new_val = new_val.isoformat()
if old_val != new_val:
changes.append({
"field": key,
"old_value": old_val,
"new_value": new_val
})
return changes
@staticmethod
def log_event(
db: Session,
event_type: str,
resource_type: str,
action: AuditAction,
user_id: Optional[str] = None,
resource_id: Optional[str] = None,
changes: Optional[List[Dict]] = None,
request_metadata: Optional[Dict] = None,
) -> AuditLog:
"""Log an audit event."""
now = datetime.utcnow()
sensitivity = AuditService.get_sensitivity_level(event_type)
checksum = AuditService.calculate_checksum(
event_type, resource_id, user_id, changes, now
)
log = AuditLog(
id=str(uuid.uuid4()),
event_type=event_type,
resource_type=resource_type,
resource_id=resource_id,
user_id=user_id,
action=action.value,
changes=changes,
request_metadata=request_metadata,
sensitivity_level=sensitivity.value,
checksum=checksum,
created_at=now,
)
db.add(log)
db.flush()
# Check if this event should trigger an alert
if event_type in ALERT_EVENTS:
AuditService.create_alert(db, log, event_type)
# Check for bulk delete pattern
if action == AuditAction.DELETE:
AuditService.check_bulk_delete(db, user_id, now)
return log
@staticmethod
def create_alert(
db: Session,
audit_log: AuditLog,
alert_type: str,
message: Optional[str] = None
) -> AuditAlert:
"""Create an audit alert and notify admins."""
# Find all system admins
admins = db.query(User).filter(User.is_system_admin == True).all()
recipient_ids = [admin.id for admin in admins]
if not message:
message = f"Sensitive operation detected: {alert_type}"
alert = AuditAlert(
id=str(uuid.uuid4()),
audit_log_id=audit_log.id,
alert_type=alert_type,
recipients=recipient_ids,
message=message,
)
db.add(alert)
db.flush()
# Send notifications to admins via NotificationService
from app.services.notification_service import NotificationService
for admin in admins:
NotificationService.create_notification(
db=db,
user_id=admin.id,
notification_type="blocker", # Using blocker type for high-priority alerts
reference_type="audit_alert",
reference_id=alert.id,
title=f"Security Alert: {alert_type}",
message=message,
)
return alert
@staticmethod
def check_bulk_delete(db: Session, user_id: Optional[str], now: datetime) -> None:
"""Check if user has exceeded bulk delete threshold."""
if not user_id:
return
window_start = now - timedelta(minutes=AuditService.BULK_DELETE_WINDOW_MINUTES)
delete_count = db.query(AuditLog).filter(
AuditLog.user_id == user_id,
AuditLog.action == "delete",
AuditLog.created_at >= window_start,
).count()
if delete_count > AuditService.BULK_DELETE_THRESHOLD:
# Create a bulk delete alert
# Get the most recent delete log to attach the alert
recent_log = db.query(AuditLog).filter(
AuditLog.user_id == user_id,
AuditLog.action == "delete",
).order_by(AuditLog.created_at.desc()).first()
if recent_log:
AuditService.create_alert(
db,
recent_log,
"bulk_delete",
f"User performed {delete_count} delete operations in {AuditService.BULK_DELETE_WINDOW_MINUTES} minutes"
)
@staticmethod
def acknowledge_alert(
db: Session,
alert_id: str,
user_id: str
) -> Optional[AuditAlert]:
"""Acknowledge an audit alert."""
alert = db.query(AuditAlert).filter(AuditAlert.id == alert_id).first()
if not alert:
return None
alert.is_acknowledged = True
alert.acknowledged_by = user_id
alert.acknowledged_at = datetime.utcnow()
db.flush()
return alert