from ldap3 import Server, Connection, ALL, Tls, SUBTREE import ssl from flask import current_app def authenticate_ldap_user(username, password): """ Authenticates a user against the LDAP server using their credentials. Returns a dictionary with user info upon success, otherwise None. 要求使用完整的UPN格式帳號 (例如: user@panjit.com.tw) """ # 驗證帳號格式必須包含 @ 符號 if '@' not in username: current_app.logger.error(f"Invalid username format: {username}. Must use full UPN format (e.g., user@domain.com)") return None user_upn = username ldap_server = current_app.config['LDAP_SERVER'] ldap_port = current_app.config['LDAP_PORT'] use_ssl = current_app.config['LDAP_USE_SSL'] server_options = {'host': ldap_server, 'port': ldap_port, 'use_ssl': use_ssl} if use_ssl: tls_config = Tls(validate=ssl.CERT_NONE, version=ssl.PROTOCOL_TLSv1_2) server_options['tls'] = tls_config server = Server(**server_options, get_info=ALL) try: print(f"[DEBUG] LDAP 連線資訊:") print(f" - 伺服器: {ldap_server}:{ldap_port}") print(f" - SSL: {use_ssl}") print(f" - 使用者 UPN: {user_upn}") current_app.logger.info(f"Connecting to LDAP server: {ldap_server}:{ldap_port}") # Attempt to bind with the user's credentials to authenticate print(f"[DEBUG] 嘗試 LDAP 連線綁定...") conn = Connection(server, user=user_upn, password=password, auto_bind=True) if conn.bound: print(f"[DEBUG] LDAP 連線綁定成功!") current_app.logger.info(f"LDAP bind successful for: {user_upn}") # Authentication successful. Now, get user details. search_base = current_app.config['LDAP_SEARCH_BASE'] login_attr = current_app.config['LDAP_USER_LOGIN_ATTR'] search_filter = f'({login_attr}={user_upn})' current_app.logger.debug(f"LDAP search - Base: {search_base}, Filter: {search_filter}") conn.search(search_base, search_filter, attributes=['mail', 'displayName', 'sAMAccountName']) if conn.entries: entry = conn.entries[0] user_info = { 'dn': entry.entry_dn, # DN 直接從 entry 物件獲取 'email': str(entry.mail) if 'mail' in entry and entry.mail else None, 'display_name': str(entry.displayName) if 'displayName' in entry and entry.displayName else None, 'username': user_upn # 使用原始UPN作為username } print(f"[DEBUG] 使用者詳細資訊:") print(f" - 顯示名稱: {user_info['display_name']}") print(f" - Email: {user_info['email']}") print(f" - 使用者名稱: {user_info['username']}") print(f" - DN: {user_info['dn']}") current_app.logger.info(f"User details retrieved: {user_info['display_name']} ({user_upn})") conn.unbind() return user_info else: # This case is unlikely if bind succeeded, but handle it just in case current_app.logger.warning(f"LDAP bind successful but user not found in search: {user_upn}") conn.unbind() return None else: # Authentication failed print(f"[DEBUG] LDAP 連線綁定失敗! 可能是帳號密碼錯誤") current_app.logger.warning(f"LDAP bind failed for: {user_upn}") return None except Exception as e: # Log the exception with more detail print(f"[DEBUG] LDAP 連線發生異常: {str(e)}") print(f"[DEBUG] 伺服器設定 - {ldap_server}:{ldap_port}, SSL={use_ssl}") current_app.logger.error(f"LDAP authentication error for {user_upn}: {str(e)}") current_app.logger.error(f"LDAP server: {ldap_server}, Port: {ldap_port}, SSL: {use_ssl}") return None def get_ldap_group_members(group_name): """ Retrieves a list of email addresses for members of a given LDAP group or organizational unit. Uses the application's bind credentials for searching. Enhanced with detailed debugging. """ print(f"[GROUP DEBUG] 開始獲取群組成員: {group_name}") ldap_server = current_app.config['LDAP_SERVER'] ldap_port = current_app.config['LDAP_PORT'] use_ssl = current_app.config['LDAP_USE_SSL'] bind_dn = current_app.config['LDAP_BIND_USER_DN'] bind_password = current_app.config['LDAP_BIND_USER_PASSWORD'] search_base = current_app.config['LDAP_SEARCH_BASE'] print(f"[GROUP DEBUG] LDAP 設定:") print(f"[GROUP DEBUG] - 伺服器: {ldap_server}:{ldap_port}") print(f"[GROUP DEBUG] - 搜尋基底: {search_base}") server_options = {'host': ldap_server, 'port': ldap_port, 'use_ssl': use_ssl} if use_ssl: tls_config = Tls(validate=ssl.CERT_NONE, version=ssl.PROTOCOL_TLSv1_2) server_options['tls'] = tls_config server = Server(**server_options, get_info=ALL) try: # Bind with the service account conn = Connection(server, user=bind_dn, password=bind_password, auto_bind=True) if conn.bound: print(f"[GROUP DEBUG] LDAP 服務帳號連線成功") # 嘗試多種搜尋方式 emails = [] # 1. 首先嘗試按 cn 搜尋群組 print(f"[GROUP DEBUG] 嘗試搜尋群組 (cn): {group_name}") group_search_filter = f'(&(objectClass=group)(cn={group_name}))' conn.search(search_base, group_search_filter, attributes=['member', 'mail']) if conn.entries: print(f"[GROUP DEBUG] 找到群組: {conn.entries[0].entry_dn}") members_dn = conn.entries[0].member.values if 'member' in conn.entries[0] else [] print(f"[GROUP DEBUG] 群組有 {len(members_dn)} 個成員") # 獲取成員郵件 for i, member_dn in enumerate(members_dn): print(f"[GROUP DEBUG] 處理成員 {i+1}: {member_dn}") try: member_conn = Connection(server, user=bind_dn, password=bind_password, auto_bind=True) member_conn.search(member_dn, '(objectClass=*)', attributes=['mail', 'sAMAccountName', 'displayName']) if member_conn.entries and 'mail' in member_conn.entries[0]: email = str(member_conn.entries[0].mail) emails.append(email) print(f"[GROUP DEBUG] 獲取成員郵件: {email}") elif member_conn.entries: # 如果沒有 mail 屬性,嘗試用 sAMAccountName 生成 sam = str(member_conn.entries[0].sAMAccountName) if 'sAMAccountName' in member_conn.entries[0] else None if sam: email = f"{sam}@panjit.com.tw" emails.append(email) print(f"[GROUP DEBUG] 生成成員郵件: {email}") member_conn.unbind() except Exception as member_error: print(f"[GROUP DEBUG] 處理成員錯誤: {member_error}") else: # 2. 如果找不到群組,嘗試按 OU 搜尋組織單位 print(f"[GROUP DEBUG] 群組未找到,嘗試搜尋組織單位: {group_name}") ou_search_filter = f'(&(objectClass=organizationalUnit)(|(ou=*{group_name}*)(name=*{group_name}*)))' conn.search(search_base, ou_search_filter, attributes=['ou', 'name']) if conn.entries: print(f"[GROUP DEBUG] 找到組織單位: {conn.entries[0].entry_dn}") # 搜尋組織單位下的所有用戶 ou_dn = conn.entries[0].entry_dn user_conn = Connection(server, user=bind_dn, password=bind_password, auto_bind=True) user_conn.search( ou_dn, '(&(objectClass=user)(!(userAccountControl:1.2.840.113556.1.4.803:=2)))', attributes=['mail', 'sAMAccountName', 'displayName'], search_scope=SUBTREE, size_limit=50 ) print(f"[GROUP DEBUG] 在組織單位下找到 {len(user_conn.entries)} 個用戶") for entry in user_conn.entries: if 'mail' in entry: email = str(entry.mail) emails.append(email) print(f"[GROUP DEBUG] 獲取用戶郵件: {email}") elif 'sAMAccountName' in entry: sam = str(entry.sAMAccountName) email = f"{sam}@panjit.com.tw" emails.append(email) print(f"[GROUP DEBUG] 生成用戶郵件: {email}") user_conn.unbind() else: print(f"[GROUP DEBUG] 找不到群組或組織單位: {group_name}") conn.unbind() print(f"[GROUP DEBUG] 最終獲取 {len(emails)} 個郵件地址") return emails else: print("[GROUP ERROR] Failed to bind to LDAP with service account.") return [] except Exception as e: print(f"LDAP group search error: {e}") return [] def search_ldap_principals(search_term, limit=10): """ Searches for LDAP principals (users) based on a search term. Returns a list of dictionaries with 'name' and 'email' keys. Uses the application's bind credentials for searching. """ if not search_term or len(search_term.strip()) < 2: print(f"[DEBUG] search_ldap_principals: 搜尋詞無效 '{search_term}'") return [] print(f"[DEBUG] search_ldap_principals: 搜尋 '{search_term}'") ldap_server = current_app.config['LDAP_SERVER'] ldap_port = current_app.config['LDAP_PORT'] use_ssl = current_app.config['LDAP_USE_SSL'] bind_dn = current_app.config['LDAP_BIND_USER_DN'] bind_password = current_app.config['LDAP_BIND_USER_PASSWORD'] search_base = current_app.config['LDAP_SEARCH_BASE'] server_options = {'host': ldap_server, 'port': ldap_port, 'use_ssl': use_ssl} if use_ssl: tls_config = Tls(validate=ssl.CERT_NONE, version=ssl.PROTOCOL_TLSv1_2) server_options['tls'] = tls_config server = Server(**server_options, get_info=ALL) try: print(f"[DEBUG] LDAP 搜尋設定:") print(f" - 伺服器: {ldap_server}:{ldap_port}") print(f" - 搜尋基底: {search_base}") print(f" - 服務帳號: {bind_dn}") # Bind with the service account conn = Connection(server, user=bind_dn, password=bind_password, auto_bind=True) if conn.bound: print(f"[DEBUG] LDAP 服務帳號連線成功") # Search for users matching the search term in multiple attributes search_filter = f'(&(objectClass=user)(objectCategory=person)(|(displayName=*{search_term}*)(sAMAccountName=*{search_term}*)(mail=*{search_term}*))(!(userAccountControl:1.2.840.113556.1.4.803:=2)))' print(f"[DEBUG] LDAP 搜尋篩選器: {search_filter}") conn.search( search_base, search_filter, attributes=['displayName', 'mail', 'sAMAccountName'], size_limit=limit ) print(f"[DEBUG] LDAP 搜尋找到 {len(conn.entries)} 個條目") results = [] for i, entry in enumerate(conn.entries): display_name = str(entry.displayName) if 'displayName' in entry and entry.displayName else None email = str(entry.mail) if 'mail' in entry and entry.mail else None sam_account = str(entry.sAMAccountName) if 'sAMAccountName' in entry and entry.sAMAccountName else None print(f"[DEBUG] 條目 {i+1}: {display_name}, {email}, {sam_account}") # Include entries with display name, generate email if missing if display_name: # Generate email from SAM account if not provided if not email and sam_account: email = f"{sam_account}@panjit.com.tw" result = { 'name': display_name, 'email': email or 'No Email', 'username': sam_account } results.append(result) print(f"[DEBUG] 加入結果: {result}") else: print(f"[DEBUG] 跳過條目 (缺少顯示名稱)") conn.unbind() print(f"[DEBUG] 最終返回 {len(results)} 個結果") return results else: print("[DEBUG] Failed to bind to LDAP with service account.") return [] except Exception as e: print(f"[DEBUG] LDAP principal search error: {e}") import traceback traceback.print_exc() return [] def search_ldap_groups(search_term, limit=10): """ 搜尋 LDAP 群組 (組織) 用於郵件發送 Returns a list of dictionaries with 'name' and 'members' keys. """ if not search_term or len(search_term.strip()) < 2: print(f"[DEBUG] search_ldap_groups: 搜尋詞無效 '{search_term}'") return [] print(f"[DEBUG] search_ldap_groups: 搜尋群組 '{search_term}'") ldap_server = current_app.config['LDAP_SERVER'] ldap_port = current_app.config['LDAP_PORT'] use_ssl = current_app.config['LDAP_USE_SSL'] bind_dn = current_app.config['LDAP_BIND_USER_DN'] bind_password = current_app.config['LDAP_BIND_USER_PASSWORD'] search_base = current_app.config['LDAP_SEARCH_BASE'] server_options = {'host': ldap_server, 'port': ldap_port, 'use_ssl': use_ssl} if use_ssl: tls_config = Tls(validate=ssl.CERT_NONE, version=ssl.PROTOCOL_TLSv1_2) server_options['tls'] = tls_config server = Server(**server_options, get_info=ALL) try: print(f"[DEBUG] LDAP 群組搜尋設定:") print(f" - 伺服器: {ldap_server}:{ldap_port}") print(f" - 搜尋基底: {search_base}") # Bind with the service account conn = Connection(server, user=bind_dn, password=bind_password, auto_bind=True) if conn.bound: print(f"[DEBUG] LDAP 服務帳號連線成功") # 先搜尋組織單位,再搜尋群組,分開處理避免複雜的篩選器語法 results = [] # 1. 搜尋組織單位 ou_filter = f'(&(objectClass=organizationalUnit)(|(ou=*{search_term}*)(name=*{search_term}*)))' print(f"[DEBUG] LDAP 組織單位搜尋篩選器: {ou_filter}") try: conn.search( search_base, ou_filter, attributes=['ou', 'name', 'mail'], size_limit=limit//2 ) print(f"[DEBUG] 找到 {len(conn.entries)} 個組織單位") for i, entry in enumerate(conn.entries): ou = str(entry.ou) if 'ou' in entry and entry.ou else None name = str(entry.name) if 'name' in entry and entry.name else None group_mail = str(entry.mail) if 'mail' in entry and entry.mail else None group_name = ou or name if group_name: # 計算組織單位下的成員數 member_count = 0 try: # 使用新的連線來計算成員,避免覆蓋當前結果 temp_conn = Connection(server, user=bind_dn, password=bind_password, auto_bind=True) if temp_conn.bound: temp_conn.search( entry.entry_dn, '(&(objectClass=user)(!(userAccountControl:1.2.840.113556.1.4.803:=2)))', attributes=['sAMAccountName'], size_limit=100, search_scope=SUBTREE ) member_count = len(temp_conn.entries) temp_conn.unbind() except Exception as member_err: print(f"[DEBUG] 計算組織成員數錯誤: {member_err}") member_count = 0 result = { 'name': group_name, 'email': group_mail, 'type': 'organizationalUnit', 'member_count': member_count, 'dn': entry.entry_dn, 'members': [] } results.append(result) print(f"[DEBUG] 加入組織單位: {result}") except Exception as e: print(f"[DEBUG] 組織單位搜尋錯誤: {e}") # 2. 搜尋群組 group_filter = f'(&(objectClass=group)(|(cn=*{search_term}*)(displayName=*{search_term}*)))' print(f"[DEBUG] LDAP 群組搜尋篩選器: {group_filter}") try: conn.search( search_base, group_filter, attributes=['cn', 'displayName', 'member', 'mail'], size_limit=limit//2 ) print(f"[DEBUG] 找到 {len(conn.entries)} 個群組") for i, entry in enumerate(conn.entries): cn = str(entry.cn) if 'cn' in entry and entry.cn else None display_name = str(entry.displayName) if 'displayName' in entry and entry.displayName else None group_mail = str(entry.mail) if 'mail' in entry and entry.mail else None members = entry.member.values if 'member' in entry else [] group_name = display_name or cn print(f"[DEBUG] 群組 {i+1}: CN={cn}, DisplayName={display_name}") print(f"[DEBUG] 群組名稱: {group_name}, 成員數: {len(members)}") if group_name: result = { 'name': group_name, 'email': group_mail, 'type': 'group', 'member_count': len(members), 'dn': entry.entry_dn, 'members': members[:5] # 只顯示前5個成員作為預覽 } results.append(result) print(f"[DEBUG] 加入群組結果: {result}") except Exception as e: print(f"[DEBUG] 群組搜尋錯誤: {e}") conn.unbind() print(f"[DEBUG] 最終返回 {len(results)} 個群組結果") return results else: print("[DEBUG] Failed to bind to LDAP with service account.") return [] except Exception as e: print(f"[DEBUG] LDAP group search error: {e}") import traceback traceback.print_exc() return [] def get_group_member_emails(group_dn): """ 獲取群組內所有成員的郵件地址 """ ldap_server = current_app.config['LDAP_SERVER'] ldap_port = current_app.config['LDAP_PORT'] use_ssl = current_app.config['LDAP_USE_SSL'] bind_dn = current_app.config['LDAP_BIND_USER_DN'] bind_password = current_app.config['LDAP_BIND_USER_PASSWORD'] server_options = {'host': ldap_server, 'port': ldap_port, 'use_ssl': use_ssl} if use_ssl: tls_config = Tls(validate=ssl.CERT_NONE, version=ssl.PROTOCOL_TLSv1_2) server_options['tls'] = tls_config server = Server(**server_options, get_info=ALL) try: conn = Connection(server, user=bind_dn, password=bind_password, auto_bind=True) if conn.bound: # First get the group members conn.search(group_dn, '(objectClass=*)', attributes=['member']) if conn.entries and 'member' in conn.entries[0]: members_dn = conn.entries[0].member.values emails = [] # For each member DN, fetch their email for member_dn in members_dn: conn.search(member_dn, '(objectClass=*)', attributes=['mail']) if conn.entries and 'mail' in conn.entries[0] and conn.entries[0].mail: emails.append(str(conn.entries[0].mail)) conn.unbind() return emails conn.unbind() return [] else: print("Failed to bind to LDAP with service account.") return [] except Exception as e: print(f"Get group member emails error: {e}") return []