485 lines
22 KiB
Python
485 lines
22 KiB
Python
from ldap3 import Server, Connection, ALL, Tls, SUBTREE
|
|
import ssl
|
|
from flask import current_app
|
|
|
|
def authenticate_ldap_user(username, password):
|
|
"""
|
|
Authenticates a user against the LDAP server using their credentials.
|
|
Returns a dictionary with user info upon success, otherwise None.
|
|
要求使用完整的UPN格式帳號 (例如: user@panjit.com.tw)
|
|
"""
|
|
# 驗證帳號格式必須包含 @ 符號
|
|
if '@' not in username:
|
|
current_app.logger.error(f"Invalid username format: {username}. Must use full UPN format (e.g., user@domain.com)")
|
|
return None
|
|
|
|
user_upn = username
|
|
|
|
ldap_server = current_app.config['LDAP_SERVER']
|
|
ldap_port = current_app.config['LDAP_PORT']
|
|
use_ssl = current_app.config['LDAP_USE_SSL']
|
|
|
|
server_options = {'host': ldap_server, 'port': ldap_port, 'use_ssl': use_ssl}
|
|
if use_ssl:
|
|
tls_config = Tls(validate=ssl.CERT_NONE, version=ssl.PROTOCOL_TLSv1_2)
|
|
server_options['tls'] = tls_config
|
|
|
|
server = Server(**server_options, get_info=ALL)
|
|
|
|
try:
|
|
print(f"[DEBUG] LDAP 連線資訊:")
|
|
print(f" - 伺服器: {ldap_server}:{ldap_port}")
|
|
print(f" - SSL: {use_ssl}")
|
|
print(f" - 使用者 UPN: {user_upn}")
|
|
current_app.logger.info(f"Connecting to LDAP server: {ldap_server}:{ldap_port}")
|
|
|
|
# Attempt to bind with the user's credentials to authenticate
|
|
print(f"[DEBUG] 嘗試 LDAP 連線綁定...")
|
|
conn = Connection(server, user=user_upn, password=password, auto_bind=True)
|
|
|
|
if conn.bound:
|
|
print(f"[DEBUG] LDAP 連線綁定成功!")
|
|
current_app.logger.info(f"LDAP bind successful for: {user_upn}")
|
|
|
|
# Authentication successful. Now, get user details.
|
|
search_base = current_app.config['LDAP_SEARCH_BASE']
|
|
login_attr = current_app.config['LDAP_USER_LOGIN_ATTR']
|
|
search_filter = f'({login_attr}={user_upn})'
|
|
|
|
current_app.logger.debug(f"LDAP search - Base: {search_base}, Filter: {search_filter}")
|
|
|
|
conn.search(search_base, search_filter, attributes=['mail', 'displayName', 'sAMAccountName'])
|
|
|
|
if conn.entries:
|
|
entry = conn.entries[0]
|
|
user_info = {
|
|
'dn': entry.entry_dn, # DN 直接從 entry 物件獲取
|
|
'email': str(entry.mail) if 'mail' in entry and entry.mail else None,
|
|
'display_name': str(entry.displayName) if 'displayName' in entry and entry.displayName else None,
|
|
'username': user_upn # 使用原始UPN作為username
|
|
}
|
|
print(f"[DEBUG] 使用者詳細資訊:")
|
|
print(f" - 顯示名稱: {user_info['display_name']}")
|
|
print(f" - Email: {user_info['email']}")
|
|
print(f" - 使用者名稱: {user_info['username']}")
|
|
print(f" - DN: {user_info['dn']}")
|
|
current_app.logger.info(f"User details retrieved: {user_info['display_name']} ({user_upn})")
|
|
conn.unbind()
|
|
return user_info
|
|
else:
|
|
# This case is unlikely if bind succeeded, but handle it just in case
|
|
current_app.logger.warning(f"LDAP bind successful but user not found in search: {user_upn}")
|
|
conn.unbind()
|
|
return None
|
|
else:
|
|
# Authentication failed
|
|
print(f"[DEBUG] LDAP 連線綁定失敗! 可能是帳號密碼錯誤")
|
|
current_app.logger.warning(f"LDAP bind failed for: {user_upn}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
# Log the exception with more detail
|
|
print(f"[DEBUG] LDAP 連線發生異常: {str(e)}")
|
|
print(f"[DEBUG] 伺服器設定 - {ldap_server}:{ldap_port}, SSL={use_ssl}")
|
|
current_app.logger.error(f"LDAP authentication error for {user_upn}: {str(e)}")
|
|
current_app.logger.error(f"LDAP server: {ldap_server}, Port: {ldap_port}, SSL: {use_ssl}")
|
|
return None
|
|
|
|
|
|
def get_ldap_group_members(group_name):
|
|
"""
|
|
Retrieves a list of email addresses for members of a given LDAP group or organizational unit.
|
|
Uses the application's bind credentials for searching.
|
|
Enhanced with detailed debugging.
|
|
"""
|
|
print(f"[GROUP DEBUG] 開始獲取群組成員: {group_name}")
|
|
|
|
ldap_server = current_app.config['LDAP_SERVER']
|
|
ldap_port = current_app.config['LDAP_PORT']
|
|
use_ssl = current_app.config['LDAP_USE_SSL']
|
|
bind_dn = current_app.config['LDAP_BIND_USER_DN']
|
|
bind_password = current_app.config['LDAP_BIND_USER_PASSWORD']
|
|
search_base = current_app.config['LDAP_SEARCH_BASE']
|
|
|
|
print(f"[GROUP DEBUG] LDAP 設定:")
|
|
print(f"[GROUP DEBUG] - 伺服器: {ldap_server}:{ldap_port}")
|
|
print(f"[GROUP DEBUG] - 搜尋基底: {search_base}")
|
|
|
|
server_options = {'host': ldap_server, 'port': ldap_port, 'use_ssl': use_ssl}
|
|
if use_ssl:
|
|
tls_config = Tls(validate=ssl.CERT_NONE, version=ssl.PROTOCOL_TLSv1_2)
|
|
server_options['tls'] = tls_config
|
|
|
|
server = Server(**server_options, get_info=ALL)
|
|
|
|
try:
|
|
# Bind with the service account
|
|
conn = Connection(server, user=bind_dn, password=bind_password, auto_bind=True)
|
|
|
|
if conn.bound:
|
|
print(f"[GROUP DEBUG] LDAP 服務帳號連線成功")
|
|
|
|
# 嘗試多種搜尋方式
|
|
emails = []
|
|
|
|
# 1. 首先嘗試按 cn 搜尋群組
|
|
print(f"[GROUP DEBUG] 嘗試搜尋群組 (cn): {group_name}")
|
|
group_search_filter = f'(&(objectClass=group)(cn={group_name}))'
|
|
conn.search(search_base, group_search_filter, attributes=['member', 'mail'])
|
|
|
|
if conn.entries:
|
|
print(f"[GROUP DEBUG] 找到群組: {conn.entries[0].entry_dn}")
|
|
members_dn = conn.entries[0].member.values if 'member' in conn.entries[0] else []
|
|
print(f"[GROUP DEBUG] 群組有 {len(members_dn)} 個成員")
|
|
|
|
# 獲取成員郵件
|
|
for i, member_dn in enumerate(members_dn):
|
|
print(f"[GROUP DEBUG] 處理成員 {i+1}: {member_dn}")
|
|
try:
|
|
member_conn = Connection(server, user=bind_dn, password=bind_password, auto_bind=True)
|
|
member_conn.search(member_dn, '(objectClass=*)', attributes=['mail', 'sAMAccountName', 'displayName'])
|
|
if member_conn.entries and 'mail' in member_conn.entries[0]:
|
|
email = str(member_conn.entries[0].mail)
|
|
emails.append(email)
|
|
print(f"[GROUP DEBUG] 獲取成員郵件: {email}")
|
|
elif member_conn.entries:
|
|
# 如果沒有 mail 屬性,嘗試用 sAMAccountName 生成
|
|
sam = str(member_conn.entries[0].sAMAccountName) if 'sAMAccountName' in member_conn.entries[0] else None
|
|
if sam:
|
|
email = f"{sam}@panjit.com.tw"
|
|
emails.append(email)
|
|
print(f"[GROUP DEBUG] 生成成員郵件: {email}")
|
|
member_conn.unbind()
|
|
except Exception as member_error:
|
|
print(f"[GROUP DEBUG] 處理成員錯誤: {member_error}")
|
|
else:
|
|
# 2. 如果找不到群組,嘗試按 OU 搜尋組織單位
|
|
print(f"[GROUP DEBUG] 群組未找到,嘗試搜尋組織單位: {group_name}")
|
|
ou_search_filter = f'(&(objectClass=organizationalUnit)(|(ou=*{group_name}*)(name=*{group_name}*)))'
|
|
conn.search(search_base, ou_search_filter, attributes=['ou', 'name'])
|
|
|
|
if conn.entries:
|
|
print(f"[GROUP DEBUG] 找到組織單位: {conn.entries[0].entry_dn}")
|
|
# 搜尋組織單位下的所有用戶
|
|
ou_dn = conn.entries[0].entry_dn
|
|
user_conn = Connection(server, user=bind_dn, password=bind_password, auto_bind=True)
|
|
user_conn.search(
|
|
ou_dn,
|
|
'(&(objectClass=user)(!(userAccountControl:1.2.840.113556.1.4.803:=2)))',
|
|
attributes=['mail', 'sAMAccountName', 'displayName'],
|
|
search_scope=SUBTREE,
|
|
size_limit=50
|
|
)
|
|
|
|
print(f"[GROUP DEBUG] 在組織單位下找到 {len(user_conn.entries)} 個用戶")
|
|
for entry in user_conn.entries:
|
|
if 'mail' in entry:
|
|
email = str(entry.mail)
|
|
emails.append(email)
|
|
print(f"[GROUP DEBUG] 獲取用戶郵件: {email}")
|
|
elif 'sAMAccountName' in entry:
|
|
sam = str(entry.sAMAccountName)
|
|
email = f"{sam}@panjit.com.tw"
|
|
emails.append(email)
|
|
print(f"[GROUP DEBUG] 生成用戶郵件: {email}")
|
|
|
|
user_conn.unbind()
|
|
else:
|
|
print(f"[GROUP DEBUG] 找不到群組或組織單位: {group_name}")
|
|
|
|
conn.unbind()
|
|
print(f"[GROUP DEBUG] 最終獲取 {len(emails)} 個郵件地址")
|
|
return emails
|
|
else:
|
|
print("[GROUP ERROR] Failed to bind to LDAP with service account.")
|
|
return []
|
|
|
|
except Exception as e:
|
|
print(f"LDAP group search error: {e}")
|
|
return []
|
|
|
|
|
|
def search_ldap_principals(search_term, limit=10):
|
|
"""
|
|
Searches for LDAP principals (users) based on a search term.
|
|
Returns a list of dictionaries with 'name' and 'email' keys.
|
|
Uses the application's bind credentials for searching.
|
|
"""
|
|
if not search_term or len(search_term.strip()) < 2:
|
|
print(f"[DEBUG] search_ldap_principals: 搜尋詞無效 '{search_term}'")
|
|
return []
|
|
|
|
print(f"[DEBUG] search_ldap_principals: 搜尋 '{search_term}'")
|
|
|
|
ldap_server = current_app.config['LDAP_SERVER']
|
|
ldap_port = current_app.config['LDAP_PORT']
|
|
use_ssl = current_app.config['LDAP_USE_SSL']
|
|
bind_dn = current_app.config['LDAP_BIND_USER_DN']
|
|
bind_password = current_app.config['LDAP_BIND_USER_PASSWORD']
|
|
search_base = current_app.config['LDAP_SEARCH_BASE']
|
|
|
|
server_options = {'host': ldap_server, 'port': ldap_port, 'use_ssl': use_ssl}
|
|
if use_ssl:
|
|
tls_config = Tls(validate=ssl.CERT_NONE, version=ssl.PROTOCOL_TLSv1_2)
|
|
server_options['tls'] = tls_config
|
|
|
|
server = Server(**server_options, get_info=ALL)
|
|
|
|
try:
|
|
print(f"[DEBUG] LDAP 搜尋設定:")
|
|
print(f" - 伺服器: {ldap_server}:{ldap_port}")
|
|
print(f" - 搜尋基底: {search_base}")
|
|
print(f" - 服務帳號: {bind_dn}")
|
|
|
|
# Bind with the service account
|
|
conn = Connection(server, user=bind_dn, password=bind_password, auto_bind=True)
|
|
|
|
if conn.bound:
|
|
print(f"[DEBUG] LDAP 服務帳號連線成功")
|
|
|
|
# Search for users matching the search term in multiple attributes
|
|
search_filter = f'(&(objectClass=user)(objectCategory=person)(|(displayName=*{search_term}*)(sAMAccountName=*{search_term}*)(mail=*{search_term}*))(!(userAccountControl:1.2.840.113556.1.4.803:=2)))'
|
|
print(f"[DEBUG] LDAP 搜尋篩選器: {search_filter}")
|
|
|
|
conn.search(
|
|
search_base,
|
|
search_filter,
|
|
attributes=['displayName', 'mail', 'sAMAccountName'],
|
|
size_limit=limit
|
|
)
|
|
|
|
print(f"[DEBUG] LDAP 搜尋找到 {len(conn.entries)} 個條目")
|
|
|
|
results = []
|
|
for i, entry in enumerate(conn.entries):
|
|
display_name = str(entry.displayName) if 'displayName' in entry and entry.displayName else None
|
|
email = str(entry.mail) if 'mail' in entry and entry.mail else None
|
|
sam_account = str(entry.sAMAccountName) if 'sAMAccountName' in entry and entry.sAMAccountName else None
|
|
|
|
print(f"[DEBUG] 條目 {i+1}: {display_name}, {email}, {sam_account}")
|
|
|
|
# Include entries with display name, generate email if missing
|
|
if display_name:
|
|
# Generate email from SAM account if not provided
|
|
if not email and sam_account:
|
|
email = f"{sam_account}@panjit.com.tw"
|
|
|
|
result = {
|
|
'name': display_name,
|
|
'email': email or 'No Email',
|
|
'username': sam_account
|
|
}
|
|
results.append(result)
|
|
print(f"[DEBUG] 加入結果: {result}")
|
|
else:
|
|
print(f"[DEBUG] 跳過條目 (缺少顯示名稱)")
|
|
|
|
conn.unbind()
|
|
print(f"[DEBUG] 最終返回 {len(results)} 個結果")
|
|
return results
|
|
else:
|
|
print("[DEBUG] Failed to bind to LDAP with service account.")
|
|
return []
|
|
|
|
except Exception as e:
|
|
print(f"[DEBUG] LDAP principal search error: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return []
|
|
|
|
|
|
def search_ldap_groups(search_term, limit=10):
|
|
"""
|
|
搜尋 LDAP 群組 (組織) 用於郵件發送
|
|
Returns a list of dictionaries with 'name' and 'members' keys.
|
|
"""
|
|
if not search_term or len(search_term.strip()) < 2:
|
|
print(f"[DEBUG] search_ldap_groups: 搜尋詞無效 '{search_term}'")
|
|
return []
|
|
|
|
print(f"[DEBUG] search_ldap_groups: 搜尋群組 '{search_term}'")
|
|
|
|
ldap_server = current_app.config['LDAP_SERVER']
|
|
ldap_port = current_app.config['LDAP_PORT']
|
|
use_ssl = current_app.config['LDAP_USE_SSL']
|
|
bind_dn = current_app.config['LDAP_BIND_USER_DN']
|
|
bind_password = current_app.config['LDAP_BIND_USER_PASSWORD']
|
|
search_base = current_app.config['LDAP_SEARCH_BASE']
|
|
|
|
server_options = {'host': ldap_server, 'port': ldap_port, 'use_ssl': use_ssl}
|
|
if use_ssl:
|
|
tls_config = Tls(validate=ssl.CERT_NONE, version=ssl.PROTOCOL_TLSv1_2)
|
|
server_options['tls'] = tls_config
|
|
|
|
server = Server(**server_options, get_info=ALL)
|
|
|
|
try:
|
|
print(f"[DEBUG] LDAP 群組搜尋設定:")
|
|
print(f" - 伺服器: {ldap_server}:{ldap_port}")
|
|
print(f" - 搜尋基底: {search_base}")
|
|
|
|
# Bind with the service account
|
|
conn = Connection(server, user=bind_dn, password=bind_password, auto_bind=True)
|
|
|
|
if conn.bound:
|
|
print(f"[DEBUG] LDAP 服務帳號連線成功")
|
|
|
|
# 先搜尋組織單位,再搜尋群組,分開處理避免複雜的篩選器語法
|
|
results = []
|
|
|
|
# 1. 搜尋組織單位
|
|
ou_filter = f'(&(objectClass=organizationalUnit)(|(ou=*{search_term}*)(name=*{search_term}*)))'
|
|
print(f"[DEBUG] LDAP 組織單位搜尋篩選器: {ou_filter}")
|
|
|
|
try:
|
|
conn.search(
|
|
search_base,
|
|
ou_filter,
|
|
attributes=['ou', 'name', 'mail'],
|
|
size_limit=limit//2
|
|
)
|
|
|
|
print(f"[DEBUG] 找到 {len(conn.entries)} 個組織單位")
|
|
|
|
for i, entry in enumerate(conn.entries):
|
|
ou = str(entry.ou) if 'ou' in entry and entry.ou else None
|
|
name = str(entry.name) if 'name' in entry and entry.name else None
|
|
group_mail = str(entry.mail) if 'mail' in entry and entry.mail else None
|
|
|
|
group_name = ou or name
|
|
|
|
if group_name:
|
|
# 計算組織單位下的成員數
|
|
member_count = 0
|
|
try:
|
|
# 使用新的連線來計算成員,避免覆蓋當前結果
|
|
temp_conn = Connection(server, user=bind_dn, password=bind_password, auto_bind=True)
|
|
if temp_conn.bound:
|
|
temp_conn.search(
|
|
entry.entry_dn,
|
|
'(&(objectClass=user)(!(userAccountControl:1.2.840.113556.1.4.803:=2)))',
|
|
attributes=['sAMAccountName'],
|
|
size_limit=100,
|
|
search_scope=SUBTREE
|
|
)
|
|
member_count = len(temp_conn.entries)
|
|
temp_conn.unbind()
|
|
except Exception as member_err:
|
|
print(f"[DEBUG] 計算組織成員數錯誤: {member_err}")
|
|
member_count = 0
|
|
|
|
result = {
|
|
'name': group_name,
|
|
'email': group_mail,
|
|
'type': 'organizationalUnit',
|
|
'member_count': member_count,
|
|
'dn': entry.entry_dn,
|
|
'members': []
|
|
}
|
|
results.append(result)
|
|
print(f"[DEBUG] 加入組織單位: {result}")
|
|
|
|
except Exception as e:
|
|
print(f"[DEBUG] 組織單位搜尋錯誤: {e}")
|
|
|
|
# 2. 搜尋群組
|
|
group_filter = f'(&(objectClass=group)(|(cn=*{search_term}*)(displayName=*{search_term}*)))'
|
|
print(f"[DEBUG] LDAP 群組搜尋篩選器: {group_filter}")
|
|
|
|
try:
|
|
conn.search(
|
|
search_base,
|
|
group_filter,
|
|
attributes=['cn', 'displayName', 'member', 'mail'],
|
|
size_limit=limit//2
|
|
)
|
|
|
|
print(f"[DEBUG] 找到 {len(conn.entries)} 個群組")
|
|
|
|
for i, entry in enumerate(conn.entries):
|
|
cn = str(entry.cn) if 'cn' in entry and entry.cn else None
|
|
display_name = str(entry.displayName) if 'displayName' in entry and entry.displayName else None
|
|
group_mail = str(entry.mail) if 'mail' in entry and entry.mail else None
|
|
members = entry.member.values if 'member' in entry else []
|
|
|
|
group_name = display_name or cn
|
|
|
|
print(f"[DEBUG] 群組 {i+1}: CN={cn}, DisplayName={display_name}")
|
|
print(f"[DEBUG] 群組名稱: {group_name}, 成員數: {len(members)}")
|
|
|
|
if group_name:
|
|
result = {
|
|
'name': group_name,
|
|
'email': group_mail,
|
|
'type': 'group',
|
|
'member_count': len(members),
|
|
'dn': entry.entry_dn,
|
|
'members': members[:5] # 只顯示前5個成員作為預覽
|
|
}
|
|
results.append(result)
|
|
print(f"[DEBUG] 加入群組結果: {result}")
|
|
|
|
except Exception as e:
|
|
print(f"[DEBUG] 群組搜尋錯誤: {e}")
|
|
|
|
conn.unbind()
|
|
print(f"[DEBUG] 最終返回 {len(results)} 個群組結果")
|
|
return results
|
|
else:
|
|
print("[DEBUG] Failed to bind to LDAP with service account.")
|
|
return []
|
|
|
|
except Exception as e:
|
|
print(f"[DEBUG] LDAP group search error: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return []
|
|
|
|
|
|
def get_group_member_emails(group_dn):
|
|
"""
|
|
獲取群組內所有成員的郵件地址
|
|
"""
|
|
ldap_server = current_app.config['LDAP_SERVER']
|
|
ldap_port = current_app.config['LDAP_PORT']
|
|
use_ssl = current_app.config['LDAP_USE_SSL']
|
|
bind_dn = current_app.config['LDAP_BIND_USER_DN']
|
|
bind_password = current_app.config['LDAP_BIND_USER_PASSWORD']
|
|
|
|
server_options = {'host': ldap_server, 'port': ldap_port, 'use_ssl': use_ssl}
|
|
if use_ssl:
|
|
tls_config = Tls(validate=ssl.CERT_NONE, version=ssl.PROTOCOL_TLSv1_2)
|
|
server_options['tls'] = tls_config
|
|
|
|
server = Server(**server_options, get_info=ALL)
|
|
|
|
try:
|
|
conn = Connection(server, user=bind_dn, password=bind_password, auto_bind=True)
|
|
|
|
if conn.bound:
|
|
# First get the group members
|
|
conn.search(group_dn, '(objectClass=*)', attributes=['member'])
|
|
|
|
if conn.entries and 'member' in conn.entries[0]:
|
|
members_dn = conn.entries[0].member.values
|
|
emails = []
|
|
|
|
# For each member DN, fetch their email
|
|
for member_dn in members_dn:
|
|
conn.search(member_dn, '(objectClass=*)', attributes=['mail'])
|
|
if conn.entries and 'mail' in conn.entries[0] and conn.entries[0].mail:
|
|
emails.append(str(conn.entries[0].mail))
|
|
|
|
conn.unbind()
|
|
return emails
|
|
|
|
conn.unbind()
|
|
return []
|
|
else:
|
|
print("Failed to bind to LDAP with service account.")
|
|
return []
|
|
|
|
except Exception as e:
|
|
print(f"Get group member emails error: {e}")
|
|
return []
|