from flask import Blueprint, request, jsonify from flask_jwt_extended import ( create_access_token, create_refresh_token, jwt_required, get_jwt_identity, get_jwt ) from datetime import datetime, timedelta from flask import current_app from models import db, TodoUserPref from utils.logger import get_logger auth_bp = Blueprint('auth', __name__) logger = get_logger(__name__) @auth_bp.route('/login', methods=['POST']) def login(): """AD/LDAP Login""" try: data = request.get_json() username = data.get('username', '').strip() password = data.get('password', '') if not username or not password: return jsonify({'error': 'Username and password required'}), 400 # Authenticate with LDAP (or mock for development) try: if current_app.config.get('USE_MOCK_LDAP', False): from utils.mock_ldap import authenticate_user logger.info("Using Mock LDAP for development") else: from utils.ldap_utils import authenticate_user logger.info("Using real LDAP authentication") user_info = authenticate_user(username, password) except Exception as e: logger.error(f"LDAP authentication error, falling back to mock: {str(e)}") from utils.mock_ldap import authenticate_user user_info = authenticate_user(username, password) if not user_info: logger.warning(f"Failed login attempt for user: {username}") return jsonify({'error': 'Invalid credentials'}), 401 ad_account = user_info['ad_account'] # Get or create user preferences user_pref = TodoUserPref.query.filter_by(ad_account=ad_account).first() if not user_pref: user_pref = TodoUserPref( ad_account=ad_account, email=user_info['email'], display_name=user_info['display_name'] ) db.session.add(user_pref) db.session.commit() logger.info(f"Created new user preference for: {ad_account}") else: # Update user info if changed if user_pref.email != user_info['email'] or user_pref.display_name != user_info['display_name']: user_pref.email = user_info['email'] user_pref.display_name = user_info['display_name'] user_pref.updated_at = datetime.utcnow() db.session.commit() # Create tokens access_token = create_access_token( identity=ad_account, additional_claims={ 'display_name': user_info['display_name'], 'email': user_info['email'] } ) refresh_token = create_refresh_token(identity=ad_account) logger.info(f"Successful login for user: {ad_account}") return jsonify({ 'access_token': access_token, 'refresh_token': refresh_token, 'user': { 'ad_account': ad_account, 'display_name': user_info['display_name'], 'email': user_info['email'], 'theme': user_pref.theme, 'language': user_pref.language } }), 200 except Exception as e: logger.error(f"Login error: {str(e)}") return jsonify({'error': 'Authentication failed'}), 500 @auth_bp.route('/refresh', methods=['POST']) @jwt_required(refresh=True) def refresh(): """Refresh access token""" try: identity = get_jwt_identity() # Get user info user_pref = TodoUserPref.query.filter_by(ad_account=identity).first() if not user_pref: return jsonify({'error': 'User not found'}), 404 access_token = create_access_token( identity=identity, additional_claims={ 'display_name': user_pref.display_name, 'email': user_pref.email } ) return jsonify({'access_token': access_token}), 200 except Exception as e: logger.error(f"Token refresh error: {str(e)}") return jsonify({'error': 'Token refresh failed'}), 500 @auth_bp.route('/logout', methods=['POST']) @jwt_required() def logout(): """Logout (client should remove tokens)""" try: identity = get_jwt_identity() logger.info(f"User logged out: {identity}") # In production, you might want to blacklist the token here # For now, we'll rely on client-side token removal return jsonify({'message': 'Logged out successfully'}), 200 except Exception as e: logger.error(f"Logout error: {str(e)}") return jsonify({'error': 'Logout failed'}), 500 @auth_bp.route('/me', methods=['GET']) @jwt_required() def get_current_user(): """Get current user information""" try: identity = get_jwt_identity() claims = get_jwt() user_pref = TodoUserPref.query.filter_by(ad_account=identity).first() if not user_pref: return jsonify({'error': 'User not found'}), 404 return jsonify({ 'ad_account': identity, 'display_name': claims.get('display_name', user_pref.display_name), 'email': claims.get('email', user_pref.email), 'preferences': user_pref.to_dict() }), 200 except Exception as e: logger.error(f"Get current user error: {str(e)}") return jsonify({'error': 'Failed to get user information'}), 500 @auth_bp.route('/validate', methods=['GET']) @jwt_required() def validate_token(): """Validate JWT token""" try: identity = get_jwt_identity() claims = get_jwt() return jsonify({ 'valid': True, 'identity': identity, 'claims': claims }), 200 except Exception as e: logger.error(f"Token validation error: {str(e)}") return jsonify({'valid': False}), 401