Files
TODO_list_system/backend/utils/ldap_utils.py
beabigegg b0c86302ff 1ST
2025-08-29 16:25:46 +08:00

230 lines
7.3 KiB
Python

import time
from ldap3 import Server, Connection, SUBTREE, ALL_ATTRIBUTES
from flask import current_app
from utils.logger import get_logger
logger = get_logger(__name__)
def create_ldap_connection(retries=3):
"""Create LDAP connection with retry mechanism"""
config = current_app.config
for attempt in range(retries):
try:
server = Server(
config['LDAP_SERVER'],
port=config['LDAP_PORT'],
use_ssl=config['LDAP_USE_SSL'],
get_info=ALL_ATTRIBUTES
)
conn = Connection(
server,
user=config['LDAP_BIND_USER_DN'],
password=config['LDAP_BIND_USER_PASSWORD'],
auto_bind=True,
raise_exceptions=True
)
logger.info("LDAP connection established successfully")
return conn
except Exception as e:
logger.error(f"LDAP connection attempt {attempt + 1} failed: {str(e)}")
if attempt == retries - 1:
raise
time.sleep(1)
return None
def authenticate_user(username, password):
"""Authenticate user against LDAP/AD"""
try:
conn = create_ldap_connection()
if not conn:
return None
config = current_app.config
search_filter = f"(&(objectClass=person)(objectCategory=person)({config['LDAP_USER_LOGIN_ATTR']}={username}))"
# Search for user
conn.search(
config['LDAP_SEARCH_BASE'],
search_filter,
SUBTREE,
attributes=['displayName', 'mail', 'sAMAccountName', 'userPrincipalName']
)
if not conn.entries:
logger.warning(f"User not found: {username}")
return None
user_entry = conn.entries[0]
user_dn = user_entry.entry_dn
# Try to bind with user credentials
try:
user_conn = Connection(
conn.server,
user=user_dn,
password=password,
auto_bind=True,
raise_exceptions=True
)
user_conn.unbind()
# Return user info
user_info = {
'ad_account': str(user_entry.sAMAccountName) if user_entry.sAMAccountName else username,
'display_name': str(user_entry.displayName) if user_entry.displayName else username,
'email': str(user_entry.mail) if user_entry.mail else '',
'user_principal_name': str(user_entry.userPrincipalName) if user_entry.userPrincipalName else username
}
logger.info(f"User authenticated successfully: {username}")
return user_info
except Exception as e:
logger.warning(f"Authentication failed for user {username}: {str(e)}")
return None
except Exception as e:
logger.error(f"LDAP authentication error: {str(e)}")
return None
finally:
if conn:
conn.unbind()
def search_ldap_principals(search_term, limit=20):
"""Search for LDAP users and groups"""
try:
conn = create_ldap_connection()
if not conn:
return []
config = current_app.config
# Build search filter for active users
search_filter = f"""(&
(objectClass=person)
(objectCategory=person)
(!(userAccountControl:1.2.840.113556.1.4.803:=2))
(|
(displayName=*{search_term}*)
(mail=*{search_term}*)
(sAMAccountName=*{search_term}*)
(userPrincipalName=*{search_term}*)
)
)"""
# Remove extra whitespace
search_filter = ' '.join(search_filter.split())
conn.search(
config['LDAP_SEARCH_BASE'],
search_filter,
SUBTREE,
attributes=['sAMAccountName', 'displayName', 'mail'],
size_limit=limit
)
results = []
for entry in conn.entries:
results.append({
'ad_account': str(entry.sAMAccountName) if entry.sAMAccountName else '',
'display_name': str(entry.displayName) if entry.displayName else '',
'email': str(entry.mail) if entry.mail else ''
})
logger.info(f"LDAP search found {len(results)} results for term: {search_term}")
return results
except Exception as e:
logger.error(f"LDAP search error: {str(e)}")
return []
finally:
if conn:
conn.unbind()
def get_user_info(ad_account):
"""Get user information from LDAP"""
try:
conn = create_ldap_connection()
if not conn:
return None
config = current_app.config
search_filter = f"(&(objectClass=person)(sAMAccountName={ad_account}))"
conn.search(
config['LDAP_SEARCH_BASE'],
search_filter,
SUBTREE,
attributes=['displayName', 'mail', 'sAMAccountName', 'userPrincipalName']
)
if not conn.entries:
return None
entry = conn.entries[0]
return {
'ad_account': str(entry.sAMAccountName) if entry.sAMAccountName else ad_account,
'display_name': str(entry.displayName) if entry.displayName else ad_account,
'email': str(entry.mail) if entry.mail else ''
}
except Exception as e:
logger.error(f"Error getting user info for {ad_account}: {str(e)}")
return None
finally:
if conn:
conn.unbind()
def validate_ad_accounts(ad_accounts):
"""Validate multiple AD accounts exist"""
try:
conn = create_ldap_connection()
if not conn:
return {}
config = current_app.config
valid_accounts = {}
for account in ad_accounts:
search_filter = f"(&(objectClass=person)(sAMAccountName={account}))"
conn.search(
config['LDAP_SEARCH_BASE'],
search_filter,
SUBTREE,
attributes=['sAMAccountName', 'displayName', 'mail']
)
if conn.entries:
entry = conn.entries[0]
valid_accounts[account] = {
'ad_account': str(entry.sAMAccountName) if entry.sAMAccountName else account,
'display_name': str(entry.displayName) if entry.displayName else account,
'email': str(entry.mail) if entry.mail else ''
}
return valid_accounts
except Exception as e:
logger.error(f"Error validating AD accounts: {str(e)}")
return {}
finally:
if conn:
conn.unbind()
def test_ldap_connection():
"""Test LDAP connection for health check"""
try:
conn = create_ldap_connection(retries=1)
if conn:
conn.unbind()
return True
return False
except Exception as e:
logger.error(f"LDAP connection test failed: {str(e)}")
return False