266 lines
8.8 KiB
Python
266 lines
8.8 KiB
Python
import time
|
|
from ldap3 import Server, Connection, SUBTREE, ALL_ATTRIBUTES
|
|
from flask import current_app
|
|
from utils.logger import get_logger
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
def create_ldap_connection(retries=3):
|
|
"""Create LDAP connection with retry mechanism"""
|
|
config = current_app.config
|
|
|
|
for attempt in range(retries):
|
|
try:
|
|
server = Server(
|
|
config['LDAP_SERVER'],
|
|
port=config['LDAP_PORT'],
|
|
use_ssl=config['LDAP_USE_SSL'],
|
|
get_info=ALL_ATTRIBUTES
|
|
)
|
|
|
|
conn = Connection(
|
|
server,
|
|
user=config['LDAP_BIND_USER_DN'],
|
|
password=config['LDAP_BIND_USER_PASSWORD'],
|
|
auto_bind=True,
|
|
raise_exceptions=True
|
|
)
|
|
|
|
logger.info("LDAP connection established successfully")
|
|
return conn
|
|
|
|
except Exception as e:
|
|
logger.error(f"LDAP connection attempt {attempt + 1} failed: {str(e)}")
|
|
if attempt == retries - 1:
|
|
raise
|
|
time.sleep(1)
|
|
|
|
return None
|
|
|
|
def authenticate_user(username, password):
|
|
"""Authenticate user against LDAP/AD"""
|
|
try:
|
|
conn = create_ldap_connection()
|
|
if not conn:
|
|
return None
|
|
|
|
config = current_app.config
|
|
search_filter = f"(&(objectClass=person)(objectCategory=person)({config['LDAP_USER_LOGIN_ATTR']}={username}))"
|
|
|
|
# Search for user
|
|
conn.search(
|
|
config['LDAP_SEARCH_BASE'],
|
|
search_filter,
|
|
SUBTREE,
|
|
attributes=['displayName', 'mail', 'sAMAccountName', 'userPrincipalName']
|
|
)
|
|
|
|
if not conn.entries:
|
|
logger.warning(f"User not found: {username}")
|
|
return None
|
|
|
|
user_entry = conn.entries[0]
|
|
user_dn = user_entry.entry_dn
|
|
|
|
# Try to bind with user credentials
|
|
try:
|
|
user_conn = Connection(
|
|
conn.server,
|
|
user=user_dn,
|
|
password=password,
|
|
auto_bind=True,
|
|
raise_exceptions=True
|
|
)
|
|
user_conn.unbind()
|
|
|
|
# Return user info
|
|
user_info = {
|
|
'ad_account': str(user_entry.sAMAccountName) if user_entry.sAMAccountName else username,
|
|
'display_name': str(user_entry.displayName) if user_entry.displayName else username,
|
|
'email': str(user_entry.mail) if user_entry.mail else '',
|
|
'user_principal_name': str(user_entry.userPrincipalName) if user_entry.userPrincipalName else username
|
|
}
|
|
|
|
logger.info(f"User authenticated successfully: {username}")
|
|
return user_info
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Authentication failed for user {username}: {str(e)}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"LDAP authentication error: {str(e)}")
|
|
return None
|
|
finally:
|
|
if conn:
|
|
conn.unbind()
|
|
|
|
def search_ldap_principals(search_term, limit=20):
|
|
"""Search for LDAP users and groups"""
|
|
try:
|
|
conn = create_ldap_connection()
|
|
if not conn:
|
|
return []
|
|
|
|
config = current_app.config
|
|
|
|
# Build search filter for active users
|
|
search_filter = f"""(&
|
|
(objectClass=person)
|
|
(objectCategory=person)
|
|
(!(userAccountControl:1.2.840.113556.1.4.803:=2))
|
|
(|
|
|
(displayName=*{search_term}*)
|
|
(mail=*{search_term}*)
|
|
(sAMAccountName=*{search_term}*)
|
|
(userPrincipalName=*{search_term}*)
|
|
)
|
|
)"""
|
|
|
|
# Remove extra whitespace
|
|
search_filter = ' '.join(search_filter.split())
|
|
|
|
conn.search(
|
|
config['LDAP_SEARCH_BASE'],
|
|
search_filter,
|
|
SUBTREE,
|
|
attributes=['sAMAccountName', 'displayName', 'mail'],
|
|
size_limit=limit
|
|
)
|
|
|
|
results = []
|
|
for entry in conn.entries:
|
|
results.append({
|
|
'ad_account': str(entry.sAMAccountName) if entry.sAMAccountName else '',
|
|
'display_name': str(entry.displayName) if entry.displayName else '',
|
|
'email': str(entry.mail) if entry.mail else ''
|
|
})
|
|
|
|
logger.info(f"LDAP search found {len(results)} results for term: {search_term}")
|
|
return results
|
|
|
|
except Exception as e:
|
|
logger.error(f"LDAP search error: {str(e)}")
|
|
return []
|
|
finally:
|
|
if conn:
|
|
conn.unbind()
|
|
|
|
def get_user_info(ad_account):
|
|
"""Get user information from LDAP"""
|
|
try:
|
|
conn = create_ldap_connection()
|
|
if not conn:
|
|
return None
|
|
|
|
config = current_app.config
|
|
|
|
# 支援 sAMAccountName 和 userPrincipalName 格式
|
|
if '@' in ad_account:
|
|
# Email 格式,使用 userPrincipalName 或 mail 搜尋
|
|
search_filter = f"""(&
|
|
(objectClass=person)
|
|
(|
|
|
(userPrincipalName={ad_account})
|
|
(mail={ad_account})
|
|
)
|
|
)"""
|
|
else:
|
|
# 純帳號名稱,使用 sAMAccountName 搜尋
|
|
search_filter = f"(&(objectClass=person)(sAMAccountName={ad_account}))"
|
|
|
|
# 移除多餘的空白
|
|
search_filter = ' '.join(search_filter.split())
|
|
|
|
conn.search(
|
|
config['LDAP_SEARCH_BASE'],
|
|
search_filter,
|
|
SUBTREE,
|
|
attributes=['displayName', 'mail', 'sAMAccountName', 'userPrincipalName']
|
|
)
|
|
|
|
if not conn.entries:
|
|
return None
|
|
|
|
entry = conn.entries[0]
|
|
return {
|
|
'ad_account': str(entry.sAMAccountName) if entry.sAMAccountName else ad_account,
|
|
'display_name': str(entry.displayName) if entry.displayName else ad_account,
|
|
'email': str(entry.mail) if entry.mail else '',
|
|
'user_principal_name': str(entry.userPrincipalName) if entry.userPrincipalName else ''
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting user info for {ad_account}: {str(e)}")
|
|
return None
|
|
finally:
|
|
if conn:
|
|
conn.unbind()
|
|
|
|
def validate_ad_accounts(ad_accounts):
|
|
"""Validate multiple AD accounts exist"""
|
|
try:
|
|
conn = create_ldap_connection()
|
|
if not conn:
|
|
return {}
|
|
|
|
config = current_app.config
|
|
valid_accounts = {}
|
|
|
|
for account in ad_accounts:
|
|
# 支援 sAMAccountName 和 userPrincipalName 格式
|
|
if '@' in account:
|
|
# Email 格式,使用 userPrincipalName 或 mail 搜尋
|
|
search_filter = f"""(&
|
|
(objectClass=person)
|
|
(|
|
|
(userPrincipalName={account})
|
|
(mail={account})
|
|
)
|
|
)"""
|
|
else:
|
|
# 純帳號名稱,使用 sAMAccountName 搜尋
|
|
search_filter = f"(&(objectClass=person)(sAMAccountName={account}))"
|
|
|
|
# 移除多餘的空白
|
|
search_filter = ' '.join(search_filter.split())
|
|
|
|
conn.search(
|
|
config['LDAP_SEARCH_BASE'],
|
|
search_filter,
|
|
SUBTREE,
|
|
attributes=['sAMAccountName', 'displayName', 'mail', 'userPrincipalName']
|
|
)
|
|
|
|
if conn.entries:
|
|
entry = conn.entries[0]
|
|
valid_accounts[account] = {
|
|
'ad_account': str(entry.sAMAccountName) if entry.sAMAccountName else account,
|
|
'display_name': str(entry.displayName) if entry.displayName else account,
|
|
'email': str(entry.mail) if entry.mail else '',
|
|
'user_principal_name': str(entry.userPrincipalName) if entry.userPrincipalName else ''
|
|
}
|
|
logger.info(f"Validated AD account: {account} -> {entry.sAMAccountName}")
|
|
else:
|
|
logger.warning(f"AD account not found: {account}")
|
|
|
|
return valid_accounts
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error validating AD accounts: {str(e)}")
|
|
return {}
|
|
finally:
|
|
if conn:
|
|
conn.unbind()
|
|
|
|
def test_ldap_connection():
|
|
"""Test LDAP connection for health check"""
|
|
try:
|
|
conn = create_ldap_connection(retries=1)
|
|
if conn:
|
|
conn.unbind()
|
|
return True
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"LDAP connection test failed: {str(e)}")
|
|
return False |