#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Authentication Routes for AI Meeting Assistant with LDAP support Author: PANJIT IT Team Created: 2024-09-18 """ from flask import Blueprint, request, jsonify, current_app from flask_jwt_extended import ( create_access_token, create_refresh_token, jwt_required, get_jwt_identity, get_jwt ) from datetime import datetime, timedelta from models import db, User auth_bp = Blueprint('auth', __name__, url_prefix='/api') @auth_bp.route('/login', methods=['POST']) def login(): """LDAP/AD Login with fallback to local authentication""" try: data = request.get_json() username = data.get('username', '').strip() password = data.get('password', '') if not username or not password: return jsonify({'error': 'Username and password required'}), 400 # Try LDAP authentication first user_info = None try: from utils.ldap_utils import authenticate_user user_info = authenticate_user(username, password) current_app.logger.info(f"LDAP authentication attempted for: {username}") except Exception as e: current_app.logger.error(f"LDAP authentication error: {str(e)}") # Fall back to local authentication if LDAP fails pass # If LDAP authentication succeeded if user_info: ad_account = user_info['ad_account'] # Get or create user in local database user = User.query.filter_by(username=ad_account).first() if not user: # Create new user from LDAP info # AD accounts default to 'user' role, only ymirliu@panjit.com.tw gets admin is_admin = username.lower() == 'ymirliu@panjit.com.tw' role = 'admin' if is_admin else 'user' # Create display name from LDAP data (username + display_name from AD) display_name = f"{ad_account} {user_info.get('display_name', '')}" if user_info.get('display_name') else ad_account user = User( username=ad_account, display_name=display_name, role=role ) # Set a placeholder password (not used for LDAP users) user.set_password('ldap_user') db.session.add(user) db.session.commit() current_app.logger.info(f"Created new LDAP user: {ad_account} ({display_name}) with role: {role}") else: # Update display name if available from LDAP if user_info.get('display_name') and not user.display_name: user.display_name = f"{ad_account} {user_info['display_name']}" # Update user role if it's ymirliu@panjit.com.tw if username.lower() == 'ymirliu@panjit.com.tw' and user.role != 'admin': user.role = 'admin' current_app.logger.info(f"Updated user {ad_account} to admin role") # Update last login time from datetime import datetime user.last_login = datetime.utcnow() db.session.commit() # Create tokens access_token = create_access_token( identity=str(user.id), additional_claims={ 'role': user.role, 'username': user.username, 'display_name': user_info.get('display_name', user.username), 'email': user_info.get('email', ''), 'auth_method': 'ldap' } ) refresh_token = create_refresh_token(identity=str(user.id)) current_app.logger.info(f"Successful LDAP login for user: {ad_account}") return jsonify({ 'access_token': access_token, 'refresh_token': refresh_token, 'user': { 'id': user.id, 'username': user.username, 'role': user.role, 'display_name': user_info.get('display_name', user.username), 'email': user_info.get('email', ''), 'auth_method': 'ldap' } }), 200 # Fall back to local database authentication user = User.query.filter_by(username=username).first() if user and user.check_password(password): access_token = create_access_token( identity=str(user.id), additional_claims={ 'role': user.role, 'username': user.username, 'auth_method': 'local' } ) refresh_token = create_refresh_token(identity=str(user.id)) current_app.logger.info(f"Successful local login for user: {username}") return jsonify({ 'access_token': access_token, 'refresh_token': refresh_token, 'user': { 'id': user.id, 'username': user.username, 'role': user.role, 'auth_method': 'local' } }), 200 # Authentication failed current_app.logger.warning(f"Failed login attempt for user: {username}") return jsonify({'error': 'Invalid credentials'}), 401 except Exception as e: current_app.logger.error(f"Login error: {str(e)}") return jsonify({'error': 'Authentication failed'}), 500 @auth_bp.route('/refresh', methods=['POST']) @jwt_required(refresh=True) def refresh(): """Refresh access token""" try: identity = get_jwt_identity() user = User.query.get(int(identity)) if not user: return jsonify({'error': 'User not found'}), 404 access_token = create_access_token( identity=identity, additional_claims={ 'role': user.role, 'username': user.username } ) return jsonify({'access_token': access_token}), 200 except Exception as e: current_app.logger.error(f"Token refresh error: {str(e)}") return jsonify({'error': 'Token refresh failed'}), 500 @auth_bp.route('/logout', methods=['POST']) @jwt_required() def logout(): """Logout (client should remove tokens)""" try: identity = get_jwt_identity() current_app.logger.info(f"User logged out: {identity}") # In production, you might want to blacklist the token here # For now, we'll rely on client-side token removal return jsonify({'message': 'Logged out successfully'}), 200 except Exception as e: current_app.logger.error(f"Logout error: {str(e)}") return jsonify({'error': 'Logout failed'}), 500 @auth_bp.route('/me', methods=['GET']) @jwt_required() def get_current_user(): """Get current user information""" try: identity = get_jwt_identity() claims = get_jwt() user = User.query.get(int(identity)) if not user: return jsonify({'error': 'User not found'}), 404 return jsonify({ 'id': user.id, 'username': user.username, 'role': user.role, 'display_name': claims.get('display_name', user.username), 'email': claims.get('email', ''), 'auth_method': claims.get('auth_method', 'local') }), 200 except Exception as e: current_app.logger.error(f"Get current user error: {str(e)}") return jsonify({'error': 'Failed to get user information'}), 500 @auth_bp.route('/validate', methods=['GET']) @jwt_required() def validate_token(): """Validate JWT token""" try: identity = get_jwt_identity() claims = get_jwt() return jsonify({ 'valid': True, 'identity': identity, 'username': claims.get('username'), 'role': claims.get('role') }), 200 except Exception as e: current_app.logger.error(f"Token validation error: {str(e)}") return jsonify({'valid': False}), 401 @auth_bp.route('/ldap-test', methods=['GET']) @jwt_required() def test_ldap(): """Test LDAP connection (admin only)""" try: claims = get_jwt() if claims.get('role') != 'admin': return jsonify({'error': 'Admin access required'}), 403 from utils.ldap_utils import test_ldap_connection result = test_ldap_connection() return jsonify({ 'ldap_connection': 'success' if result else 'failed', 'timestamp': datetime.utcnow().isoformat() }), 200 except Exception as e: current_app.logger.error(f"LDAP test error: {str(e)}") return jsonify({'error': 'LDAP test failed'}), 500