""" Tool_OCR - Audit Log Service Handles security audit logging """ import logging from typing import Optional, List, Tuple from sqlalchemy.orm import Session from sqlalchemy import desc, and_ from datetime import datetime, timedelta import json from app.models.audit_log import AuditLog logger = logging.getLogger(__name__) class AuditService: """Service for security audit logging""" def log_event( self, db: Session, event_type: str, event_category: str, description: str, user_id: Optional[int] = None, ip_address: Optional[str] = None, user_agent: Optional[str] = None, resource_type: Optional[str] = None, resource_id: Optional[str] = None, success: bool = True, error_message: Optional[str] = None, metadata: Optional[dict] = None ) -> AuditLog: """ Log a security audit event Args: db: Database session event_type: Type of event (auth_login, task_create, etc.) event_category: Category (authentication, task, admin, system) description: Human-readable description user_id: User who performed action (optional) ip_address: Client IP address (optional) user_agent: Client user agent (optional) resource_type: Type of affected resource (optional) resource_id: ID of affected resource (optional) success: Whether the action succeeded error_message: Error details if failed (optional) metadata: Additional JSON metadata (optional) Returns: Created AuditLog object """ # Convert metadata to JSON string metadata_str = json.dumps(metadata) if metadata else None # Create audit log entry audit_log = AuditLog( user_id=user_id, event_type=event_type, event_category=event_category, description=description, ip_address=ip_address, user_agent=user_agent, resource_type=resource_type, resource_id=resource_id, success=1 if success else 0, error_message=error_message, metadata=metadata_str ) db.add(audit_log) db.commit() db.refresh(audit_log) # Log to application logger log_level = logging.INFO if success else logging.WARNING logger.log( log_level, f"Audit: [{event_category}] {event_type} - {description} " f"(user_id={user_id}, success={success})" ) return audit_log def get_logs( self, db: Session, user_id: Optional[int] = None, event_category: Optional[str] = None, event_type: Optional[str] = None, date_from: Optional[datetime] = None, date_to: Optional[datetime] = None, success_only: Optional[bool] = None, skip: int = 0, limit: int = 100 ) -> Tuple[List[AuditLog], int]: """ Get audit logs with filtering Args: db: Database session user_id: Filter by user ID (optional) event_category: Filter by category (optional) event_type: Filter by event type (optional) date_from: Filter from date (optional) date_to: Filter to date (optional) success_only: Filter by success status (optional) skip: Pagination offset limit: Pagination limit Returns: Tuple of (logs list, total count) """ # Base query query = db.query(AuditLog) # Apply filters if user_id is not None: query = query.filter(AuditLog.user_id == user_id) if event_category: query = query.filter(AuditLog.event_category == event_category) if event_type: query = query.filter(AuditLog.event_type == event_type) if date_from: query = query.filter(AuditLog.created_at >= date_from) if date_to: date_to_end = date_to + timedelta(days=1) query = query.filter(AuditLog.created_at < date_to_end) if success_only is not None: query = query.filter(AuditLog.success == (1 if success_only else 0)) # Get total count total = query.count() # Apply sorting and pagination logs = query.order_by(desc(AuditLog.created_at)).offset(skip).limit(limit).all() return logs, total def get_user_activity_summary( self, db: Session, user_id: int, days: int = 30 ) -> dict: """ Get user activity summary for the last N days Args: db: Database session user_id: User ID days: Number of days to look back Returns: Dictionary with activity counts """ date_from = datetime.utcnow() - timedelta(days=days) # Get all user events in period logs = db.query(AuditLog).filter( and_( AuditLog.user_id == user_id, AuditLog.created_at >= date_from ) ).all() # Count by category summary = { "total_events": len(logs), "by_category": {}, "failed_attempts": 0, "last_login": None } for log in logs: # Count by category if log.event_category not in summary["by_category"]: summary["by_category"][log.event_category] = 0 summary["by_category"][log.event_category] += 1 # Count failures if not log.success: summary["failed_attempts"] += 1 # Track last login if log.event_type == "auth_login" and log.success: if not summary["last_login"] or log.created_at > summary["last_login"]: summary["last_login"] = log.created_at.isoformat() return summary # Singleton instance audit_service = AuditService()