#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ LDAP Authentication Utilities for AI Meeting Assistant Author: PANJIT IT Team Created: 2024-09-18 """ import time from ldap3 import Server, Connection, SUBTREE, ALL_ATTRIBUTES from flask import current_app def get_logger(): """Get application logger""" return current_app.logger def create_ldap_connection(retries=3): """Create LDAP connection with retry mechanism""" logger = get_logger() # LDAP Configuration from environment ldap_server = current_app.config.get('LDAP_SERVER', 'panjit.com.tw') ldap_port = current_app.config.get('LDAP_PORT', 389) use_ssl = current_app.config.get('LDAP_USE_SSL', False) bind_dn = current_app.config.get('LDAP_BIND_USER_DN', '') bind_password = current_app.config.get('LDAP_BIND_USER_PASSWORD', '') for attempt in range(retries): try: server = Server( ldap_server, port=ldap_port, use_ssl=use_ssl, get_info=ALL_ATTRIBUTES ) conn = Connection( server, user=bind_dn, password=bind_password, auto_bind=True, raise_exceptions=True ) logger.info("LDAP connection established successfully") return conn except Exception as e: logger.error(f"LDAP connection attempt {attempt + 1} failed: {str(e)}") if attempt == retries - 1: raise time.sleep(1) return None def authenticate_user(username, password): """Authenticate user against LDAP/AD""" logger = get_logger() try: conn = create_ldap_connection() if not conn: return None # Configuration search_base = current_app.config.get('LDAP_SEARCH_BASE', 'DC=panjit,DC=com,DC=tw') login_attr = current_app.config.get('LDAP_USER_LOGIN_ATTR', 'userPrincipalName') # Search for user search_filter = f"(&(objectClass=person)(objectCategory=person)({login_attr}={username}))" conn.search( search_base, search_filter, SUBTREE, attributes=['displayName', 'mail', 'sAMAccountName', 'userPrincipalName'] ) if not conn.entries: logger.warning(f"User not found: {username}") return None user_entry = conn.entries[0] user_dn = user_entry.entry_dn # Try to bind with user credentials try: user_conn = Connection( conn.server, user=user_dn, password=password, auto_bind=True, raise_exceptions=True ) user_conn.unbind() # Return user info user_info = { 'ad_account': str(user_entry.sAMAccountName) if user_entry.sAMAccountName else username, 'display_name': str(user_entry.displayName) if user_entry.displayName else username, 'email': str(user_entry.mail) if user_entry.mail else '', 'user_principal_name': str(user_entry.userPrincipalName) if user_entry.userPrincipalName else username, 'username': username } logger.info(f"User authenticated successfully: {username}") return user_info except Exception as e: logger.warning(f"Authentication failed for user {username}: {str(e)}") return None except Exception as e: logger.error(f"LDAP authentication error: {str(e)}") return None finally: if conn: conn.unbind() def test_ldap_connection(): """Test LDAP connection for health check""" logger = get_logger() try: conn = create_ldap_connection(retries=1) if conn: conn.unbind() return True return False except Exception as e: logger.error(f"LDAP connection test failed: {str(e)}") return False