#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ LDAP 認證服務 Author: PANJIT IT Team Created: 2024-01-28 Modified: 2024-01-28 """ import time from ldap3 import Server, Connection, SUBTREE, ALL_ATTRIBUTES from flask import current_app from .logger import get_logger from .exceptions import AuthenticationError logger = get_logger(__name__) class LDAPAuthService: """LDAP 認證服務""" def __init__(self): self.config = current_app.config self.server_url = self.config.get('LDAP_SERVER') self.port = self.config.get('LDAP_PORT', 389) self.use_ssl = self.config.get('LDAP_USE_SSL', False) self.bind_user_dn = self.config.get('LDAP_BIND_USER_DN') self.bind_password = self.config.get('LDAP_BIND_USER_PASSWORD') self.search_base = self.config.get('LDAP_SEARCH_BASE') self.login_attr = self.config.get('LDAP_USER_LOGIN_ATTR', 'userPrincipalName') def create_connection(self, retries=3): """建立 LDAP 連線(帶重試機制)""" for attempt in range(retries): try: server = Server( self.server_url, port=self.port, use_ssl=self.use_ssl, get_info=ALL_ATTRIBUTES ) conn = Connection( server, user=self.bind_user_dn, password=self.bind_password, auto_bind=True, raise_exceptions=True ) logger.info("LDAP connection established successfully") return conn except Exception as e: logger.error(f"LDAP connection attempt {attempt + 1} failed: {str(e)}") if attempt == retries - 1: raise AuthenticationError(f"LDAP connection failed: {str(e)}") time.sleep(1) return None def authenticate_user(self, username, password): """驗證使用者憑證""" try: conn = self.create_connection() if not conn: raise AuthenticationError("Unable to connect to LDAP server") # 搜尋使用者 search_filter = f"(&(objectClass=person)(objectCategory=person)({self.login_attr}={username}))" conn.search( self.search_base, search_filter, SUBTREE, attributes=['displayName', 'mail', 'sAMAccountName', 'userPrincipalName', 'department'] ) if not conn.entries: logger.warning(f"User not found: {username}") raise AuthenticationError("帳號不存在") user_entry = conn.entries[0] user_dn = user_entry.entry_dn # 驗證使用者密碼 try: user_conn = Connection( conn.server, user=user_dn, password=password, auto_bind=True, raise_exceptions=True ) user_conn.unbind() # 返回使用者資訊 user_info = { 'username': str(user_entry.sAMAccountName) if user_entry.sAMAccountName else username, 'display_name': str(user_entry.displayName) if user_entry.displayName else username, 'email': str(user_entry.mail) if user_entry.mail else f"{username}@panjit.com.tw", 'department': str(user_entry.department) if hasattr(user_entry, 'department') and user_entry.department else None, 'user_principal_name': str(user_entry.userPrincipalName) if user_entry.userPrincipalName else username } logger.info(f"User authenticated successfully: {username}") return user_info except Exception as e: logger.warning(f"Authentication failed for user {username}: {str(e)}") raise AuthenticationError("密碼錯誤") except AuthenticationError: raise except Exception as e: logger.error(f"LDAP authentication error: {str(e)}") raise AuthenticationError(f"認證服務錯誤: {str(e)}") finally: if 'conn' in locals() and conn: conn.unbind() def search_users(self, search_term, limit=20): """搜尋使用者""" try: conn = self.create_connection() if not conn: return [] # 建構搜尋過濾器 search_filter = f"""(& (objectClass=person) (objectCategory=person) (!(userAccountControl:1.2.840.113556.1.4.803:=2)) (| (displayName=*{search_term}*) (mail=*{search_term}*) (sAMAccountName=*{search_term}*) (userPrincipalName=*{search_term}*) ) )""" # 移除多餘空白 search_filter = ' '.join(search_filter.split()) conn.search( self.search_base, search_filter, SUBTREE, attributes=['sAMAccountName', 'displayName', 'mail', 'department'], size_limit=limit ) results = [] for entry in conn.entries: results.append({ 'username': str(entry.sAMAccountName) if entry.sAMAccountName else '', 'display_name': str(entry.displayName) if entry.displayName else '', 'email': str(entry.mail) if entry.mail else '', 'department': str(entry.department) if hasattr(entry, 'department') and entry.department else '' }) logger.info(f"LDAP search found {len(results)} results for term: {search_term}") return results except Exception as e: logger.error(f"LDAP search error: {str(e)}") return [] finally: if 'conn' in locals() and conn: conn.unbind() def get_user_info(self, username): """取得使用者詳細資訊""" try: conn = self.create_connection() if not conn: return None # 支援 sAMAccountName 和 userPrincipalName 格式 if '@' in username: search_filter = f"""(& (objectClass=person) (| (userPrincipalName={username}) (mail={username}) ) )""" else: search_filter = f"(&(objectClass=person)(sAMAccountName={username}))" # 移除多餘空白 search_filter = ' '.join(search_filter.split()) conn.search( self.search_base, search_filter, SUBTREE, attributes=['displayName', 'mail', 'sAMAccountName', 'userPrincipalName', 'department'] ) if not conn.entries: return None entry = conn.entries[0] return { 'username': str(entry.sAMAccountName) if entry.sAMAccountName else username, 'display_name': str(entry.displayName) if entry.displayName else username, 'email': str(entry.mail) if entry.mail else f"{username}@panjit.com.tw", 'department': str(entry.department) if hasattr(entry, 'department') and entry.department else None, 'user_principal_name': str(entry.userPrincipalName) if entry.userPrincipalName else '' } except Exception as e: logger.error(f"Error getting user info for {username}: {str(e)}") return None finally: if 'conn' in locals() and conn: conn.unbind() def test_connection(self): """測試 LDAP 連線(健康檢查用)""" try: conn = self.create_connection(retries=1) if conn: conn.unbind() return True return False except Exception as e: logger.error(f"LDAP connection test failed: {str(e)}") return False