Files
TEMP_spec_system_V3/ldap_utils.py
beabigegg 4f7f46b07a 2ND
2025-08-28 08:59:46 +08:00

485 lines
22 KiB
Python

from ldap3 import Server, Connection, ALL, Tls, SUBTREE
import ssl
from flask import current_app
def authenticate_ldap_user(username, password):
"""
Authenticates a user against the LDAP server using their credentials.
Returns a dictionary with user info upon success, otherwise None.
要求使用完整的UPN格式帳號 (例如: user@panjit.com.tw)
"""
# 驗證帳號格式必須包含 @ 符號
if '@' not in username:
current_app.logger.error(f"Invalid username format: {username}. Must use full UPN format (e.g., user@domain.com)")
return None
user_upn = username
ldap_server = current_app.config['LDAP_SERVER']
ldap_port = current_app.config['LDAP_PORT']
use_ssl = current_app.config['LDAP_USE_SSL']
server_options = {'host': ldap_server, 'port': ldap_port, 'use_ssl': use_ssl}
if use_ssl:
tls_config = Tls(validate=ssl.CERT_NONE, version=ssl.PROTOCOL_TLSv1_2)
server_options['tls'] = tls_config
server = Server(**server_options, get_info=ALL)
try:
print(f"[DEBUG] LDAP 連線資訊:")
print(f" - 伺服器: {ldap_server}:{ldap_port}")
print(f" - SSL: {use_ssl}")
print(f" - 使用者 UPN: {user_upn}")
current_app.logger.info(f"Connecting to LDAP server: {ldap_server}:{ldap_port}")
# Attempt to bind with the user's credentials to authenticate
print(f"[DEBUG] 嘗試 LDAP 連線綁定...")
conn = Connection(server, user=user_upn, password=password, auto_bind=True)
if conn.bound:
print(f"[DEBUG] LDAP 連線綁定成功!")
current_app.logger.info(f"LDAP bind successful for: {user_upn}")
# Authentication successful. Now, get user details.
search_base = current_app.config['LDAP_SEARCH_BASE']
login_attr = current_app.config['LDAP_USER_LOGIN_ATTR']
search_filter = f'({login_attr}={user_upn})'
current_app.logger.debug(f"LDAP search - Base: {search_base}, Filter: {search_filter}")
conn.search(search_base, search_filter, attributes=['mail', 'displayName', 'sAMAccountName'])
if conn.entries:
entry = conn.entries[0]
user_info = {
'dn': entry.entry_dn, # DN 直接從 entry 物件獲取
'email': str(entry.mail) if 'mail' in entry and entry.mail else None,
'display_name': str(entry.displayName) if 'displayName' in entry and entry.displayName else None,
'username': user_upn # 使用原始UPN作為username
}
print(f"[DEBUG] 使用者詳細資訊:")
print(f" - 顯示名稱: {user_info['display_name']}")
print(f" - Email: {user_info['email']}")
print(f" - 使用者名稱: {user_info['username']}")
print(f" - DN: {user_info['dn']}")
current_app.logger.info(f"User details retrieved: {user_info['display_name']} ({user_upn})")
conn.unbind()
return user_info
else:
# This case is unlikely if bind succeeded, but handle it just in case
current_app.logger.warning(f"LDAP bind successful but user not found in search: {user_upn}")
conn.unbind()
return None
else:
# Authentication failed
print(f"[DEBUG] LDAP 連線綁定失敗! 可能是帳號密碼錯誤")
current_app.logger.warning(f"LDAP bind failed for: {user_upn}")
return None
except Exception as e:
# Log the exception with more detail
print(f"[DEBUG] LDAP 連線發生異常: {str(e)}")
print(f"[DEBUG] 伺服器設定 - {ldap_server}:{ldap_port}, SSL={use_ssl}")
current_app.logger.error(f"LDAP authentication error for {user_upn}: {str(e)}")
current_app.logger.error(f"LDAP server: {ldap_server}, Port: {ldap_port}, SSL: {use_ssl}")
return None
def get_ldap_group_members(group_name):
"""
Retrieves a list of email addresses for members of a given LDAP group or organizational unit.
Uses the application's bind credentials for searching.
Enhanced with detailed debugging.
"""
print(f"[GROUP DEBUG] 開始獲取群組成員: {group_name}")
ldap_server = current_app.config['LDAP_SERVER']
ldap_port = current_app.config['LDAP_PORT']
use_ssl = current_app.config['LDAP_USE_SSL']
bind_dn = current_app.config['LDAP_BIND_USER_DN']
bind_password = current_app.config['LDAP_BIND_USER_PASSWORD']
search_base = current_app.config['LDAP_SEARCH_BASE']
print(f"[GROUP DEBUG] LDAP 設定:")
print(f"[GROUP DEBUG] - 伺服器: {ldap_server}:{ldap_port}")
print(f"[GROUP DEBUG] - 搜尋基底: {search_base}")
server_options = {'host': ldap_server, 'port': ldap_port, 'use_ssl': use_ssl}
if use_ssl:
tls_config = Tls(validate=ssl.CERT_NONE, version=ssl.PROTOCOL_TLSv1_2)
server_options['tls'] = tls_config
server = Server(**server_options, get_info=ALL)
try:
# Bind with the service account
conn = Connection(server, user=bind_dn, password=bind_password, auto_bind=True)
if conn.bound:
print(f"[GROUP DEBUG] LDAP 服務帳號連線成功")
# 嘗試多種搜尋方式
emails = []
# 1. 首先嘗試按 cn 搜尋群組
print(f"[GROUP DEBUG] 嘗試搜尋群組 (cn): {group_name}")
group_search_filter = f'(&(objectClass=group)(cn={group_name}))'
conn.search(search_base, group_search_filter, attributes=['member', 'mail'])
if conn.entries:
print(f"[GROUP DEBUG] 找到群組: {conn.entries[0].entry_dn}")
members_dn = conn.entries[0].member.values if 'member' in conn.entries[0] else []
print(f"[GROUP DEBUG] 群組有 {len(members_dn)} 個成員")
# 獲取成員郵件
for i, member_dn in enumerate(members_dn):
print(f"[GROUP DEBUG] 處理成員 {i+1}: {member_dn}")
try:
member_conn = Connection(server, user=bind_dn, password=bind_password, auto_bind=True)
member_conn.search(member_dn, '(objectClass=*)', attributes=['mail', 'sAMAccountName', 'displayName'])
if member_conn.entries and 'mail' in member_conn.entries[0]:
email = str(member_conn.entries[0].mail)
emails.append(email)
print(f"[GROUP DEBUG] 獲取成員郵件: {email}")
elif member_conn.entries:
# 如果沒有 mail 屬性,嘗試用 sAMAccountName 生成
sam = str(member_conn.entries[0].sAMAccountName) if 'sAMAccountName' in member_conn.entries[0] else None
if sam:
email = f"{sam}@panjit.com.tw"
emails.append(email)
print(f"[GROUP DEBUG] 生成成員郵件: {email}")
member_conn.unbind()
except Exception as member_error:
print(f"[GROUP DEBUG] 處理成員錯誤: {member_error}")
else:
# 2. 如果找不到群組,嘗試按 OU 搜尋組織單位
print(f"[GROUP DEBUG] 群組未找到,嘗試搜尋組織單位: {group_name}")
ou_search_filter = f'(&(objectClass=organizationalUnit)(|(ou=*{group_name}*)(name=*{group_name}*)))'
conn.search(search_base, ou_search_filter, attributes=['ou', 'name'])
if conn.entries:
print(f"[GROUP DEBUG] 找到組織單位: {conn.entries[0].entry_dn}")
# 搜尋組織單位下的所有用戶
ou_dn = conn.entries[0].entry_dn
user_conn = Connection(server, user=bind_dn, password=bind_password, auto_bind=True)
user_conn.search(
ou_dn,
'(&(objectClass=user)(!(userAccountControl:1.2.840.113556.1.4.803:=2)))',
attributes=['mail', 'sAMAccountName', 'displayName'],
search_scope=SUBTREE,
size_limit=50
)
print(f"[GROUP DEBUG] 在組織單位下找到 {len(user_conn.entries)} 個用戶")
for entry in user_conn.entries:
if 'mail' in entry:
email = str(entry.mail)
emails.append(email)
print(f"[GROUP DEBUG] 獲取用戶郵件: {email}")
elif 'sAMAccountName' in entry:
sam = str(entry.sAMAccountName)
email = f"{sam}@panjit.com.tw"
emails.append(email)
print(f"[GROUP DEBUG] 生成用戶郵件: {email}")
user_conn.unbind()
else:
print(f"[GROUP DEBUG] 找不到群組或組織單位: {group_name}")
conn.unbind()
print(f"[GROUP DEBUG] 最終獲取 {len(emails)} 個郵件地址")
return emails
else:
print("[GROUP ERROR] Failed to bind to LDAP with service account.")
return []
except Exception as e:
print(f"LDAP group search error: {e}")
return []
def search_ldap_principals(search_term, limit=10):
"""
Searches for LDAP principals (users) based on a search term.
Returns a list of dictionaries with 'name' and 'email' keys.
Uses the application's bind credentials for searching.
"""
if not search_term or len(search_term.strip()) < 2:
print(f"[DEBUG] search_ldap_principals: 搜尋詞無效 '{search_term}'")
return []
print(f"[DEBUG] search_ldap_principals: 搜尋 '{search_term}'")
ldap_server = current_app.config['LDAP_SERVER']
ldap_port = current_app.config['LDAP_PORT']
use_ssl = current_app.config['LDAP_USE_SSL']
bind_dn = current_app.config['LDAP_BIND_USER_DN']
bind_password = current_app.config['LDAP_BIND_USER_PASSWORD']
search_base = current_app.config['LDAP_SEARCH_BASE']
server_options = {'host': ldap_server, 'port': ldap_port, 'use_ssl': use_ssl}
if use_ssl:
tls_config = Tls(validate=ssl.CERT_NONE, version=ssl.PROTOCOL_TLSv1_2)
server_options['tls'] = tls_config
server = Server(**server_options, get_info=ALL)
try:
print(f"[DEBUG] LDAP 搜尋設定:")
print(f" - 伺服器: {ldap_server}:{ldap_port}")
print(f" - 搜尋基底: {search_base}")
print(f" - 服務帳號: {bind_dn}")
# Bind with the service account
conn = Connection(server, user=bind_dn, password=bind_password, auto_bind=True)
if conn.bound:
print(f"[DEBUG] LDAP 服務帳號連線成功")
# Search for users matching the search term in multiple attributes
search_filter = f'(&(objectClass=user)(objectCategory=person)(|(displayName=*{search_term}*)(sAMAccountName=*{search_term}*)(mail=*{search_term}*))(!(userAccountControl:1.2.840.113556.1.4.803:=2)))'
print(f"[DEBUG] LDAP 搜尋篩選器: {search_filter}")
conn.search(
search_base,
search_filter,
attributes=['displayName', 'mail', 'sAMAccountName'],
size_limit=limit
)
print(f"[DEBUG] LDAP 搜尋找到 {len(conn.entries)} 個條目")
results = []
for i, entry in enumerate(conn.entries):
display_name = str(entry.displayName) if 'displayName' in entry and entry.displayName else None
email = str(entry.mail) if 'mail' in entry and entry.mail else None
sam_account = str(entry.sAMAccountName) if 'sAMAccountName' in entry and entry.sAMAccountName else None
print(f"[DEBUG] 條目 {i+1}: {display_name}, {email}, {sam_account}")
# Include entries with display name, generate email if missing
if display_name:
# Generate email from SAM account if not provided
if not email and sam_account:
email = f"{sam_account}@panjit.com.tw"
result = {
'name': display_name,
'email': email or 'No Email',
'username': sam_account
}
results.append(result)
print(f"[DEBUG] 加入結果: {result}")
else:
print(f"[DEBUG] 跳過條目 (缺少顯示名稱)")
conn.unbind()
print(f"[DEBUG] 最終返回 {len(results)} 個結果")
return results
else:
print("[DEBUG] Failed to bind to LDAP with service account.")
return []
except Exception as e:
print(f"[DEBUG] LDAP principal search error: {e}")
import traceback
traceback.print_exc()
return []
def search_ldap_groups(search_term, limit=10):
"""
搜尋 LDAP 群組 (組織) 用於郵件發送
Returns a list of dictionaries with 'name' and 'members' keys.
"""
if not search_term or len(search_term.strip()) < 2:
print(f"[DEBUG] search_ldap_groups: 搜尋詞無效 '{search_term}'")
return []
print(f"[DEBUG] search_ldap_groups: 搜尋群組 '{search_term}'")
ldap_server = current_app.config['LDAP_SERVER']
ldap_port = current_app.config['LDAP_PORT']
use_ssl = current_app.config['LDAP_USE_SSL']
bind_dn = current_app.config['LDAP_BIND_USER_DN']
bind_password = current_app.config['LDAP_BIND_USER_PASSWORD']
search_base = current_app.config['LDAP_SEARCH_BASE']
server_options = {'host': ldap_server, 'port': ldap_port, 'use_ssl': use_ssl}
if use_ssl:
tls_config = Tls(validate=ssl.CERT_NONE, version=ssl.PROTOCOL_TLSv1_2)
server_options['tls'] = tls_config
server = Server(**server_options, get_info=ALL)
try:
print(f"[DEBUG] LDAP 群組搜尋設定:")
print(f" - 伺服器: {ldap_server}:{ldap_port}")
print(f" - 搜尋基底: {search_base}")
# Bind with the service account
conn = Connection(server, user=bind_dn, password=bind_password, auto_bind=True)
if conn.bound:
print(f"[DEBUG] LDAP 服務帳號連線成功")
# 先搜尋組織單位,再搜尋群組,分開處理避免複雜的篩選器語法
results = []
# 1. 搜尋組織單位
ou_filter = f'(&(objectClass=organizationalUnit)(|(ou=*{search_term}*)(name=*{search_term}*)))'
print(f"[DEBUG] LDAP 組織單位搜尋篩選器: {ou_filter}")
try:
conn.search(
search_base,
ou_filter,
attributes=['ou', 'name', 'mail'],
size_limit=limit//2
)
print(f"[DEBUG] 找到 {len(conn.entries)} 個組織單位")
for i, entry in enumerate(conn.entries):
ou = str(entry.ou) if 'ou' in entry and entry.ou else None
name = str(entry.name) if 'name' in entry and entry.name else None
group_mail = str(entry.mail) if 'mail' in entry and entry.mail else None
group_name = ou or name
if group_name:
# 計算組織單位下的成員數
member_count = 0
try:
# 使用新的連線來計算成員,避免覆蓋當前結果
temp_conn = Connection(server, user=bind_dn, password=bind_password, auto_bind=True)
if temp_conn.bound:
temp_conn.search(
entry.entry_dn,
'(&(objectClass=user)(!(userAccountControl:1.2.840.113556.1.4.803:=2)))',
attributes=['sAMAccountName'],
size_limit=100,
search_scope=SUBTREE
)
member_count = len(temp_conn.entries)
temp_conn.unbind()
except Exception as member_err:
print(f"[DEBUG] 計算組織成員數錯誤: {member_err}")
member_count = 0
result = {
'name': group_name,
'email': group_mail,
'type': 'organizationalUnit',
'member_count': member_count,
'dn': entry.entry_dn,
'members': []
}
results.append(result)
print(f"[DEBUG] 加入組織單位: {result}")
except Exception as e:
print(f"[DEBUG] 組織單位搜尋錯誤: {e}")
# 2. 搜尋群組
group_filter = f'(&(objectClass=group)(|(cn=*{search_term}*)(displayName=*{search_term}*)))'
print(f"[DEBUG] LDAP 群組搜尋篩選器: {group_filter}")
try:
conn.search(
search_base,
group_filter,
attributes=['cn', 'displayName', 'member', 'mail'],
size_limit=limit//2
)
print(f"[DEBUG] 找到 {len(conn.entries)} 個群組")
for i, entry in enumerate(conn.entries):
cn = str(entry.cn) if 'cn' in entry and entry.cn else None
display_name = str(entry.displayName) if 'displayName' in entry and entry.displayName else None
group_mail = str(entry.mail) if 'mail' in entry and entry.mail else None
members = entry.member.values if 'member' in entry else []
group_name = display_name or cn
print(f"[DEBUG] 群組 {i+1}: CN={cn}, DisplayName={display_name}")
print(f"[DEBUG] 群組名稱: {group_name}, 成員數: {len(members)}")
if group_name:
result = {
'name': group_name,
'email': group_mail,
'type': 'group',
'member_count': len(members),
'dn': entry.entry_dn,
'members': members[:5] # 只顯示前5個成員作為預覽
}
results.append(result)
print(f"[DEBUG] 加入群組結果: {result}")
except Exception as e:
print(f"[DEBUG] 群組搜尋錯誤: {e}")
conn.unbind()
print(f"[DEBUG] 最終返回 {len(results)} 個群組結果")
return results
else:
print("[DEBUG] Failed to bind to LDAP with service account.")
return []
except Exception as e:
print(f"[DEBUG] LDAP group search error: {e}")
import traceback
traceback.print_exc()
return []
def get_group_member_emails(group_dn):
"""
獲取群組內所有成員的郵件地址
"""
ldap_server = current_app.config['LDAP_SERVER']
ldap_port = current_app.config['LDAP_PORT']
use_ssl = current_app.config['LDAP_USE_SSL']
bind_dn = current_app.config['LDAP_BIND_USER_DN']
bind_password = current_app.config['LDAP_BIND_USER_PASSWORD']
server_options = {'host': ldap_server, 'port': ldap_port, 'use_ssl': use_ssl}
if use_ssl:
tls_config = Tls(validate=ssl.CERT_NONE, version=ssl.PROTOCOL_TLSv1_2)
server_options['tls'] = tls_config
server = Server(**server_options, get_info=ALL)
try:
conn = Connection(server, user=bind_dn, password=bind_password, auto_bind=True)
if conn.bound:
# First get the group members
conn.search(group_dn, '(objectClass=*)', attributes=['member'])
if conn.entries and 'member' in conn.entries[0]:
members_dn = conn.entries[0].member.values
emails = []
# For each member DN, fetch their email
for member_dn in members_dn:
conn.search(member_dn, '(objectClass=*)', attributes=['mail'])
if conn.entries and 'mail' in conn.entries[0] and conn.entries[0].mail:
emails.append(str(conn.entries[0].mail))
conn.unbind()
return emails
conn.unbind()
return []
else:
print("Failed to bind to LDAP with service account.")
return []
except Exception as e:
print(f"Get group member emails error: {e}")
return []