Files
OCR/backend/app/services/audit_service.py
egg ad2b832fb6 feat: complete external auth V2 migration with advanced features
This commit implements comprehensive external Azure AD authentication
with complete task management, file download, and admin monitoring systems.

## Core Features Implemented (80% Complete)

### 1. Token Auto-Refresh Mechanism 
- Backend: POST /api/v2/auth/refresh endpoint
- Frontend: Auto-refresh 5 minutes before expiration
- Auto-retry on 401 errors with seamless token refresh

### 2. File Download System 
- Three format support: JSON / Markdown / PDF
- Endpoints: GET /api/v2/tasks/{id}/download/{format}
- File access control with ownership validation
- Frontend download buttons in TaskHistoryPage

### 3. Complete Task Management 
Backend Endpoints:
- POST /api/v2/tasks/{id}/start - Start task
- POST /api/v2/tasks/{id}/cancel - Cancel task
- POST /api/v2/tasks/{id}/retry - Retry failed task
- GET /api/v2/tasks - List with filters (status, filename, date range)
- GET /api/v2/tasks/stats - User statistics

Frontend Features:
- Status-based action buttons (Start/Cancel/Retry)
- Advanced search and filtering (status, filename, date range)
- Pagination and sorting
- Task statistics dashboard (5 stat cards)

### 4. Admin Monitoring System  (Backend)
Admin APIs:
- GET /api/v2/admin/stats - System statistics
- GET /api/v2/admin/users - User list with stats
- GET /api/v2/admin/users/top - User leaderboard
- GET /api/v2/admin/audit-logs - Audit log query system
- GET /api/v2/admin/audit-logs/user/{id}/summary

Admin Features:
- Email-based admin check (ymirliu@panjit.com.tw)
- Comprehensive system metrics (users, tasks, sessions, activity)
- Audit logging service for security tracking

### 5. User Isolation & Security 
- Row-level security on all task queries
- File access control with ownership validation
- Strict user_id filtering on all operations
- Session validation and expiry checking
- Admin privilege verification

## New Files Created

Backend:
- backend/app/models/user_v2.py - User model for external auth
- backend/app/models/task.py - Task model with user isolation
- backend/app/models/session.py - Session management
- backend/app/models/audit_log.py - Audit log model
- backend/app/services/external_auth_service.py - External API client
- backend/app/services/task_service.py - Task CRUD with isolation
- backend/app/services/file_access_service.py - File access control
- backend/app/services/admin_service.py - Admin operations
- backend/app/services/audit_service.py - Audit logging
- backend/app/routers/auth_v2.py - V2 auth endpoints
- backend/app/routers/tasks.py - Task management endpoints
- backend/app/routers/admin.py - Admin endpoints
- backend/alembic/versions/5e75a59fb763_*.py - DB migration

Frontend:
- frontend/src/services/apiV2.ts - Complete V2 API client
- frontend/src/types/apiV2.ts - V2 type definitions
- frontend/src/pages/TaskHistoryPage.tsx - Task history UI

Modified Files:
- backend/app/core/deps.py - Added get_current_admin_user_v2
- backend/app/main.py - Registered admin router
- frontend/src/pages/LoginPage.tsx - V2 login integration
- frontend/src/components/Layout.tsx - User display and logout
- frontend/src/App.tsx - Added /tasks route

## Documentation
- openspec/changes/.../PROGRESS_UPDATE.md - Detailed progress report

## Pending Items (20%)
1. Database migration execution for audit_logs table
2. Frontend admin dashboard page
3. Frontend audit log viewer

## Testing Status
- Manual testing:  Authentication flow verified
- Unit tests:  Pending
- Integration tests:  Pending

## Security Enhancements
-  User isolation (row-level security)
-  File access control
-  Token expiry validation
-  Admin privilege verification
-  Audit logging infrastructure
-  Token encryption (noted, low priority)
-  Rate limiting (noted, low priority)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-14 17:19:43 +08:00

198 lines
6.0 KiB
Python

"""
Tool_OCR - Audit Log Service
Handles security audit logging
"""
import logging
from typing import Optional, List, Tuple
from sqlalchemy.orm import Session
from sqlalchemy import desc, and_
from datetime import datetime, timedelta
import json
from app.models.audit_log import AuditLog
logger = logging.getLogger(__name__)
class AuditService:
"""Service for security audit logging"""
def log_event(
self,
db: Session,
event_type: str,
event_category: str,
description: str,
user_id: Optional[int] = None,
ip_address: Optional[str] = None,
user_agent: Optional[str] = None,
resource_type: Optional[str] = None,
resource_id: Optional[str] = None,
success: bool = True,
error_message: Optional[str] = None,
metadata: Optional[dict] = None
) -> AuditLog:
"""
Log a security audit event
Args:
db: Database session
event_type: Type of event (auth_login, task_create, etc.)
event_category: Category (authentication, task, admin, system)
description: Human-readable description
user_id: User who performed action (optional)
ip_address: Client IP address (optional)
user_agent: Client user agent (optional)
resource_type: Type of affected resource (optional)
resource_id: ID of affected resource (optional)
success: Whether the action succeeded
error_message: Error details if failed (optional)
metadata: Additional JSON metadata (optional)
Returns:
Created AuditLog object
"""
# Convert metadata to JSON string
metadata_str = json.dumps(metadata) if metadata else None
# Create audit log entry
audit_log = AuditLog(
user_id=user_id,
event_type=event_type,
event_category=event_category,
description=description,
ip_address=ip_address,
user_agent=user_agent,
resource_type=resource_type,
resource_id=resource_id,
success=1 if success else 0,
error_message=error_message,
metadata=metadata_str
)
db.add(audit_log)
db.commit()
db.refresh(audit_log)
# Log to application logger
log_level = logging.INFO if success else logging.WARNING
logger.log(
log_level,
f"Audit: [{event_category}] {event_type} - {description} "
f"(user_id={user_id}, success={success})"
)
return audit_log
def get_logs(
self,
db: Session,
user_id: Optional[int] = None,
event_category: Optional[str] = None,
event_type: Optional[str] = None,
date_from: Optional[datetime] = None,
date_to: Optional[datetime] = None,
success_only: Optional[bool] = None,
skip: int = 0,
limit: int = 100
) -> Tuple[List[AuditLog], int]:
"""
Get audit logs with filtering
Args:
db: Database session
user_id: Filter by user ID (optional)
event_category: Filter by category (optional)
event_type: Filter by event type (optional)
date_from: Filter from date (optional)
date_to: Filter to date (optional)
success_only: Filter by success status (optional)
skip: Pagination offset
limit: Pagination limit
Returns:
Tuple of (logs list, total count)
"""
# Base query
query = db.query(AuditLog)
# Apply filters
if user_id is not None:
query = query.filter(AuditLog.user_id == user_id)
if event_category:
query = query.filter(AuditLog.event_category == event_category)
if event_type:
query = query.filter(AuditLog.event_type == event_type)
if date_from:
query = query.filter(AuditLog.created_at >= date_from)
if date_to:
date_to_end = date_to + timedelta(days=1)
query = query.filter(AuditLog.created_at < date_to_end)
if success_only is not None:
query = query.filter(AuditLog.success == (1 if success_only else 0))
# Get total count
total = query.count()
# Apply sorting and pagination
logs = query.order_by(desc(AuditLog.created_at)).offset(skip).limit(limit).all()
return logs, total
def get_user_activity_summary(
self,
db: Session,
user_id: int,
days: int = 30
) -> dict:
"""
Get user activity summary for the last N days
Args:
db: Database session
user_id: User ID
days: Number of days to look back
Returns:
Dictionary with activity counts
"""
date_from = datetime.utcnow() - timedelta(days=days)
# Get all user events in period
logs = db.query(AuditLog).filter(
and_(
AuditLog.user_id == user_id,
AuditLog.created_at >= date_from
)
).all()
# Count by category
summary = {
"total_events": len(logs),
"by_category": {},
"failed_attempts": 0,
"last_login": None
}
for log in logs:
# Count by category
if log.event_category not in summary["by_category"]:
summary["by_category"][log.event_category] = 0
summary["by_category"][log.event_category] += 1
# Count failures
if not log.success:
summary["failed_attempts"] += 1
# Track last login
if log.event_type == "auth_login" and log.success:
if not summary["last_login"] or log.created_at > summary["last_login"]:
summary["last_login"] = log.created_at.isoformat()
return summary
# Singleton instance
audit_service = AuditService()