232 lines
8.6 KiB
Python
232 lines
8.6 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
LDAP 認證服務
|
|
|
|
Author: PANJIT IT Team
|
|
Created: 2024-01-28
|
|
Modified: 2024-01-28
|
|
"""
|
|
|
|
import time
|
|
from ldap3 import Server, Connection, SUBTREE, ALL_ATTRIBUTES
|
|
from flask import current_app
|
|
from .logger import get_logger
|
|
from .exceptions import AuthenticationError
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
class LDAPAuthService:
|
|
"""LDAP 認證服務"""
|
|
|
|
def __init__(self):
|
|
self.config = current_app.config
|
|
self.server_url = self.config.get('LDAP_SERVER')
|
|
self.port = self.config.get('LDAP_PORT', 389)
|
|
self.use_ssl = self.config.get('LDAP_USE_SSL', False)
|
|
self.bind_user_dn = self.config.get('LDAP_BIND_USER_DN')
|
|
self.bind_password = self.config.get('LDAP_BIND_USER_PASSWORD')
|
|
self.search_base = self.config.get('LDAP_SEARCH_BASE')
|
|
self.login_attr = self.config.get('LDAP_USER_LOGIN_ATTR', 'userPrincipalName')
|
|
|
|
def create_connection(self, retries=3):
|
|
"""建立 LDAP 連線(帶重試機制)"""
|
|
for attempt in range(retries):
|
|
try:
|
|
server = Server(
|
|
self.server_url,
|
|
port=self.port,
|
|
use_ssl=self.use_ssl,
|
|
get_info=ALL_ATTRIBUTES
|
|
)
|
|
|
|
conn = Connection(
|
|
server,
|
|
user=self.bind_user_dn,
|
|
password=self.bind_password,
|
|
auto_bind=True,
|
|
raise_exceptions=True
|
|
)
|
|
|
|
logger.info("LDAP connection established successfully")
|
|
return conn
|
|
|
|
except Exception as e:
|
|
logger.error(f"LDAP connection attempt {attempt + 1} failed: {str(e)}")
|
|
if attempt == retries - 1:
|
|
raise AuthenticationError(f"LDAP connection failed: {str(e)}")
|
|
time.sleep(1)
|
|
|
|
return None
|
|
|
|
def authenticate_user(self, username, password):
|
|
"""驗證使用者憑證"""
|
|
try:
|
|
conn = self.create_connection()
|
|
if not conn:
|
|
raise AuthenticationError("Unable to connect to LDAP server")
|
|
|
|
# 搜尋使用者
|
|
search_filter = f"(&(objectClass=person)(objectCategory=person)({self.login_attr}={username}))"
|
|
|
|
conn.search(
|
|
self.search_base,
|
|
search_filter,
|
|
SUBTREE,
|
|
attributes=['displayName', 'mail', 'sAMAccountName', 'userPrincipalName', 'department']
|
|
)
|
|
|
|
if not conn.entries:
|
|
logger.warning(f"User not found: {username}")
|
|
raise AuthenticationError("帳號不存在")
|
|
|
|
user_entry = conn.entries[0]
|
|
user_dn = user_entry.entry_dn
|
|
|
|
# 驗證使用者密碼
|
|
try:
|
|
user_conn = Connection(
|
|
conn.server,
|
|
user=user_dn,
|
|
password=password,
|
|
auto_bind=True,
|
|
raise_exceptions=True
|
|
)
|
|
user_conn.unbind()
|
|
|
|
# 返回使用者資訊
|
|
user_info = {
|
|
'username': str(user_entry.sAMAccountName) if user_entry.sAMAccountName else username,
|
|
'display_name': str(user_entry.displayName) if user_entry.displayName else username,
|
|
'email': str(user_entry.mail) if user_entry.mail else f"{username}@panjit.com.tw",
|
|
'department': str(user_entry.department) if hasattr(user_entry, 'department') and user_entry.department else None,
|
|
'user_principal_name': str(user_entry.userPrincipalName) if user_entry.userPrincipalName else username
|
|
}
|
|
|
|
logger.info(f"User authenticated successfully: {username}")
|
|
return user_info
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Authentication failed for user {username}: {str(e)}")
|
|
raise AuthenticationError("密碼錯誤")
|
|
|
|
except AuthenticationError:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"LDAP authentication error: {str(e)}")
|
|
raise AuthenticationError(f"認證服務錯誤: {str(e)}")
|
|
|
|
finally:
|
|
if 'conn' in locals() and conn:
|
|
conn.unbind()
|
|
|
|
def search_users(self, search_term, limit=20):
|
|
"""搜尋使用者"""
|
|
try:
|
|
conn = self.create_connection()
|
|
if not conn:
|
|
return []
|
|
|
|
# 建構搜尋過濾器
|
|
search_filter = f"""(&
|
|
(objectClass=person)
|
|
(objectCategory=person)
|
|
(!(userAccountControl:1.2.840.113556.1.4.803:=2))
|
|
(|
|
|
(displayName=*{search_term}*)
|
|
(mail=*{search_term}*)
|
|
(sAMAccountName=*{search_term}*)
|
|
(userPrincipalName=*{search_term}*)
|
|
)
|
|
)"""
|
|
|
|
# 移除多餘空白
|
|
search_filter = ' '.join(search_filter.split())
|
|
|
|
conn.search(
|
|
self.search_base,
|
|
search_filter,
|
|
SUBTREE,
|
|
attributes=['sAMAccountName', 'displayName', 'mail', 'department'],
|
|
size_limit=limit
|
|
)
|
|
|
|
results = []
|
|
for entry in conn.entries:
|
|
results.append({
|
|
'username': str(entry.sAMAccountName) if entry.sAMAccountName else '',
|
|
'display_name': str(entry.displayName) if entry.displayName else '',
|
|
'email': str(entry.mail) if entry.mail else '',
|
|
'department': str(entry.department) if hasattr(entry, 'department') and entry.department else ''
|
|
})
|
|
|
|
logger.info(f"LDAP search found {len(results)} results for term: {search_term}")
|
|
return results
|
|
|
|
except Exception as e:
|
|
logger.error(f"LDAP search error: {str(e)}")
|
|
return []
|
|
finally:
|
|
if 'conn' in locals() and conn:
|
|
conn.unbind()
|
|
|
|
def get_user_info(self, username):
|
|
"""取得使用者詳細資訊"""
|
|
try:
|
|
conn = self.create_connection()
|
|
if not conn:
|
|
return None
|
|
|
|
# 支援 sAMAccountName 和 userPrincipalName 格式
|
|
if '@' in username:
|
|
search_filter = f"""(&
|
|
(objectClass=person)
|
|
(|
|
|
(userPrincipalName={username})
|
|
(mail={username})
|
|
)
|
|
)"""
|
|
else:
|
|
search_filter = f"(&(objectClass=person)(sAMAccountName={username}))"
|
|
|
|
# 移除多餘空白
|
|
search_filter = ' '.join(search_filter.split())
|
|
|
|
conn.search(
|
|
self.search_base,
|
|
search_filter,
|
|
SUBTREE,
|
|
attributes=['displayName', 'mail', 'sAMAccountName', 'userPrincipalName', 'department']
|
|
)
|
|
|
|
if not conn.entries:
|
|
return None
|
|
|
|
entry = conn.entries[0]
|
|
return {
|
|
'username': str(entry.sAMAccountName) if entry.sAMAccountName else username,
|
|
'display_name': str(entry.displayName) if entry.displayName else username,
|
|
'email': str(entry.mail) if entry.mail else f"{username}@panjit.com.tw",
|
|
'department': str(entry.department) if hasattr(entry, 'department') and entry.department else None,
|
|
'user_principal_name': str(entry.userPrincipalName) if entry.userPrincipalName else ''
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting user info for {username}: {str(e)}")
|
|
return None
|
|
finally:
|
|
if 'conn' in locals() and conn:
|
|
conn.unbind()
|
|
|
|
def test_connection(self):
|
|
"""測試 LDAP 連線(健康檢查用)"""
|
|
try:
|
|
conn = self.create_connection(retries=1)
|
|
if conn:
|
|
conn.unbind()
|
|
return True
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"LDAP connection test failed: {str(e)}")
|
|
return False |