Files
TEMP_spec_system_V3/ldap_utils.py
beabigegg b9557250a4 1st
2025-08-27 18:03:54 +08:00

119 lines
4.6 KiB
Python

from ldap3 import Server, Connection, ALL, Tls
import ssl
from flask import current_app
def authenticate_ldap_user(username, password):
"""
Authenticates a user against the LDAP server using their credentials.
Returns a dictionary with user info upon success, otherwise None.
"""
# Ensure username is in UPN format (e.g., user@domain.com)
# Assuming the domain part is derivable or static.
# This logic might need adjustment based on how users log in.
if '@' not in username:
# This assumes a fixed domain, which should be configured or improved
domain = current_app.config['LDAP_SEARCH_BASE'].split('dc=')[1].replace(',', '.')
user_upn = f"{username}@{domain}"
else:
user_upn = username
ldap_server = current_app.config['LDAP_SERVER']
ldap_port = current_app.config['LDAP_PORT']
use_ssl = current_app.config['LDAP_USE_SSL']
server_options = {'host': ldap_server, 'port': ldap_port, 'use_ssl': use_ssl}
if use_ssl:
tls_config = Tls(validate=ssl.CERT_NONE, version=ssl.PROTOCOL_TLSv1_2)
server_options['tls'] = tls_config
server = Server(**server_options, get_info=ALL)
try:
# Attempt to bind with the user's credentials to authenticate
conn = Connection(server, user=user_upn, password=password, auto_bind=True)
if conn.bound:
# Authentication successful. Now, get user details.
search_base = current_app.config['LDAP_SEARCH_BASE']
login_attr = current_app.config['LDAP_USER_LOGIN_ATTR']
search_filter = f'({login_attr}={user_upn})'
conn.search(search_base, search_filter, attributes=['dn', 'mail', 'displayName', 'sAMAccountName'])
if conn.entries:
entry = conn.entries[0]
user_info = {
'dn': entry.entry_dn,
'email': str(entry.mail) if 'mail' in entry else None,
'display_name': str(entry.displayName) if 'displayName' in entry else None,
'username': str(entry.sAMAccountName) if 'sAMAccountName' in entry else username
}
conn.unbind()
return user_info
else:
# This case is unlikely if bind succeeded, but handle it just in case
conn.unbind()
return None
else:
# Authentication failed
return None
except Exception as e:
# Log the exception in a real application
print(f"LDAP authentication error: {e}")
return None
def get_ldap_group_members(group_name):
"""
Retrieves a list of email addresses for members of a given LDAP group.
Uses the application's bind credentials for searching.
"""
ldap_server = current_app.config['LDAP_SERVER']
ldap_port = current_app.config['LDAP_PORT']
use_ssl = current_app.config['LDAP_USE_SSL']
bind_dn = current_app.config['LDAP_BIND_USER_DN']
bind_password = current_app.config['LDAP_BIND_USER_PASSWORD']
search_base = current_app.config['LDAP_SEARCH_BASE']
server_options = {'host': ldap_server, 'port': ldap_port, 'use_ssl': use_ssl}
if use_ssl:
tls_config = Tls(validate=ssl.CERT_NONE, version=ssl.PROTOCOL_TLSv1_2)
server_options['tls'] = tls_config
server = Server(**server_options, get_info=ALL)
try:
# Bind with the service account
conn = Connection(server, user=bind_dn, password=bind_password, auto_bind=True)
if conn.bound:
# First, find the group to get its member list
group_search_filter = f'(&(objectClass=group)(cn={group_name}))'
conn.search(search_base, group_search_filter, attributes=['member'])
if not conn.entries:
print(f"LDAP group '{group_name}' not found.")
conn.unbind()
return []
members_dn = conn.entries[0].member.values
emails = []
# For each member DN, fetch their email
for member_dn in members_dn:
member_filter = f'(objectDN={member_dn})'
conn.search(member_dn, '(objectClass=*)', attributes=['mail'])
if conn.entries and 'mail' in conn.entries[0]:
emails.append(str(conn.entries[0].mail))
conn.unbind()
return emails
else:
print("Failed to bind to LDAP with service account.")
return []
except Exception as e:
print(f"LDAP group search error: {e}")
return []