import time from ldap3 import Server, Connection, SUBTREE, ALL_ATTRIBUTES from flask import current_app from utils.logger import get_logger logger = get_logger(__name__) def create_ldap_connection(retries=3): """Create LDAP connection with retry mechanism""" config = current_app.config for attempt in range(retries): try: server = Server( config['LDAP_SERVER'], port=config['LDAP_PORT'], use_ssl=config['LDAP_USE_SSL'], get_info=ALL_ATTRIBUTES ) conn = Connection( server, user=config['LDAP_BIND_USER_DN'], password=config['LDAP_BIND_USER_PASSWORD'], auto_bind=True, raise_exceptions=True ) logger.info("LDAP connection established successfully") return conn except Exception as e: logger.error(f"LDAP connection attempt {attempt + 1} failed: {str(e)}") if attempt == retries - 1: raise time.sleep(1) return None def authenticate_user(username, password): """Authenticate user against LDAP/AD""" try: conn = create_ldap_connection() if not conn: return None config = current_app.config search_filter = f"(&(objectClass=person)(objectCategory=person)({config['LDAP_USER_LOGIN_ATTR']}={username}))" # Search for user conn.search( config['LDAP_SEARCH_BASE'], search_filter, SUBTREE, attributes=['displayName', 'mail', 'sAMAccountName', 'userPrincipalName'] ) if not conn.entries: logger.warning(f"User not found: {username}") return None user_entry = conn.entries[0] user_dn = user_entry.entry_dn # Try to bind with user credentials try: user_conn = Connection( conn.server, user=user_dn, password=password, auto_bind=True, raise_exceptions=True ) user_conn.unbind() # Return user info user_info = { 'ad_account': str(user_entry.sAMAccountName) if user_entry.sAMAccountName else username, 'display_name': str(user_entry.displayName) if user_entry.displayName else username, 'email': str(user_entry.mail) if user_entry.mail else '', 'user_principal_name': str(user_entry.userPrincipalName) if user_entry.userPrincipalName else username } logger.info(f"User authenticated successfully: {username}") return user_info except Exception as e: logger.warning(f"Authentication failed for user {username}: {str(e)}") return None except Exception as e: logger.error(f"LDAP authentication error: {str(e)}") return None finally: if conn: conn.unbind() def search_ldap_principals(search_term, limit=20): """Search for LDAP users and groups""" try: conn = create_ldap_connection() if not conn: return [] config = current_app.config # Build search filter for active users search_filter = f"""(& (objectClass=person) (objectCategory=person) (!(userAccountControl:1.2.840.113556.1.4.803:=2)) (| (displayName=*{search_term}*) (mail=*{search_term}*) (sAMAccountName=*{search_term}*) (userPrincipalName=*{search_term}*) ) )""" # Remove extra whitespace search_filter = ' '.join(search_filter.split()) conn.search( config['LDAP_SEARCH_BASE'], search_filter, SUBTREE, attributes=['sAMAccountName', 'displayName', 'mail'], size_limit=limit ) results = [] for entry in conn.entries: results.append({ 'ad_account': str(entry.sAMAccountName) if entry.sAMAccountName else '', 'display_name': str(entry.displayName) if entry.displayName else '', 'email': str(entry.mail) if entry.mail else '' }) logger.info(f"LDAP search found {len(results)} results for term: {search_term}") return results except Exception as e: logger.error(f"LDAP search error: {str(e)}") return [] finally: if conn: conn.unbind() def get_user_info(ad_account): """Get user information from LDAP""" try: conn = create_ldap_connection() if not conn: return None config = current_app.config # 支援 sAMAccountName 和 userPrincipalName 格式 if '@' in ad_account: # Email 格式,使用 userPrincipalName 或 mail 搜尋 search_filter = f"""(& (objectClass=person) (| (userPrincipalName={ad_account}) (mail={ad_account}) ) )""" else: # 純帳號名稱,使用 sAMAccountName 搜尋 search_filter = f"(&(objectClass=person)(sAMAccountName={ad_account}))" # 移除多餘的空白 search_filter = ' '.join(search_filter.split()) conn.search( config['LDAP_SEARCH_BASE'], search_filter, SUBTREE, attributes=['displayName', 'mail', 'sAMAccountName', 'userPrincipalName'] ) if not conn.entries: return None entry = conn.entries[0] return { 'ad_account': str(entry.sAMAccountName) if entry.sAMAccountName else ad_account, 'display_name': str(entry.displayName) if entry.displayName else ad_account, 'email': str(entry.mail) if entry.mail else '', 'user_principal_name': str(entry.userPrincipalName) if entry.userPrincipalName else '' } except Exception as e: logger.error(f"Error getting user info for {ad_account}: {str(e)}") return None finally: if conn: conn.unbind() def validate_ad_accounts(ad_accounts): """Validate multiple AD accounts exist""" try: conn = create_ldap_connection() if not conn: return {} config = current_app.config valid_accounts = {} for account in ad_accounts: # 支援 sAMAccountName 和 userPrincipalName 格式 if '@' in account: # Email 格式,使用 userPrincipalName 或 mail 搜尋 search_filter = f"""(& (objectClass=person) (| (userPrincipalName={account}) (mail={account}) ) )""" else: # 純帳號名稱,使用 sAMAccountName 搜尋 search_filter = f"(&(objectClass=person)(sAMAccountName={account}))" # 移除多餘的空白 search_filter = ' '.join(search_filter.split()) conn.search( config['LDAP_SEARCH_BASE'], search_filter, SUBTREE, attributes=['sAMAccountName', 'displayName', 'mail', 'userPrincipalName'] ) if conn.entries: entry = conn.entries[0] valid_accounts[account] = { 'ad_account': str(entry.sAMAccountName) if entry.sAMAccountName else account, 'display_name': str(entry.displayName) if entry.displayName else account, 'email': str(entry.mail) if entry.mail else '', 'user_principal_name': str(entry.userPrincipalName) if entry.userPrincipalName else '' } logger.info(f"Validated AD account: {account} -> {entry.sAMAccountName}") else: logger.warning(f"AD account not found: {account}") return valid_accounts except Exception as e: logger.error(f"Error validating AD accounts: {str(e)}") return {} finally: if conn: conn.unbind() def test_ldap_connection(): """Test LDAP connection for health check""" try: conn = create_ldap_connection(retries=1) if conn: conn.unbind() return True return False except Exception as e: logger.error(f"LDAP connection test failed: {str(e)}") return False