企業內部新聞彙整與分析系統 - 自動新聞抓取 (Digitimes, 經濟日報, 工商時報) - AI 智慧摘要 (OpenAI/Claude/Ollama) - 群組管理與訂閱通知 - 已清理 Python 快取檔案 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
119 lines
3.7 KiB
Python
119 lines
3.7 KiB
Python
"""
|
||
安全認證模組
|
||
處理密碼雜湊、JWT Token、LDAP 認證
|
||
"""
|
||
from datetime import datetime, timedelta
|
||
from typing import Optional, Any
|
||
from jose import JWTError, jwt
|
||
import bcrypt
|
||
from ldap3 import Server, Connection, ALL, NTLM
|
||
from ldap3.core.exceptions import LDAPException
|
||
from ldap3.utils.conv import escape_filter_chars
|
||
import logging
|
||
|
||
from app.core.config import settings
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
def verify_password(plain_password: str, hashed_password: str) -> bool:
|
||
"""驗證密碼"""
|
||
return bcrypt.checkpw(
|
||
plain_password.encode('utf-8'),
|
||
hashed_password.encode('utf-8')
|
||
)
|
||
|
||
|
||
def get_password_hash(password: str) -> str:
|
||
"""產生密碼雜湊"""
|
||
return bcrypt.hashpw(
|
||
password.encode('utf-8'),
|
||
bcrypt.gensalt()
|
||
).decode('utf-8')
|
||
|
||
|
||
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
|
||
"""建立 JWT Access Token"""
|
||
to_encode = data.copy()
|
||
if expires_delta:
|
||
expire = datetime.utcnow() + expires_delta
|
||
else:
|
||
expire = datetime.utcnow() + timedelta(minutes=settings.jwt_access_token_expire_minutes)
|
||
to_encode.update({"exp": expire})
|
||
encoded_jwt = jwt.encode(to_encode, settings.jwt_secret_key, algorithm=settings.jwt_algorithm)
|
||
return encoded_jwt
|
||
|
||
|
||
def decode_access_token(token: str) -> Optional[dict]:
|
||
"""解碼 JWT Access Token"""
|
||
try:
|
||
payload = jwt.decode(token, settings.jwt_secret_key, algorithms=[settings.jwt_algorithm])
|
||
return payload
|
||
except JWTError:
|
||
return None
|
||
|
||
|
||
def verify_ldap_credentials(username: str, password: str) -> Optional[dict]:
|
||
"""
|
||
驗證 LDAP/AD 憑證
|
||
|
||
Returns:
|
||
成功時返回用戶資訊 dict,失敗返回 None
|
||
"""
|
||
if not settings.ldap_server:
|
||
return None
|
||
|
||
try:
|
||
server = Server(settings.ldap_server, port=settings.ldap_port, get_info=ALL)
|
||
|
||
# 嘗試綁定(使用 NTLM 或簡單綁定)
|
||
user_dn = f"{username}@{settings.ldap_base_dn.replace('DC=', '').replace(',', '.')}"
|
||
|
||
conn = Connection(
|
||
server,
|
||
user=user_dn,
|
||
password=password,
|
||
authentication=NTLM,
|
||
auto_bind=True
|
||
)
|
||
|
||
if conn.bound:
|
||
# 查詢用戶資訊
|
||
# 轉義特殊字元,防止 LDAP 注入
|
||
safe_username = escape_filter_chars(username)
|
||
search_filter = f"(sAMAccountName={safe_username})"
|
||
conn.search(
|
||
settings.ldap_base_dn,
|
||
search_filter,
|
||
attributes=['displayName', 'mail', 'department']
|
||
)
|
||
|
||
if conn.entries:
|
||
entry = conn.entries[0]
|
||
return {
|
||
"username": username,
|
||
"display_name": str(entry.displayName) if hasattr(entry, 'displayName') else username,
|
||
"email": str(entry.mail) if hasattr(entry, 'mail') else None,
|
||
"department": str(entry.department) if hasattr(entry, 'department') else None
|
||
}
|
||
|
||
conn.unbind()
|
||
return {"username": username, "display_name": username}
|
||
|
||
return None
|
||
|
||
except LDAPException as e:
|
||
logger.error("LDAP 認證失敗", exc_info=True) # 不記錄詳細錯誤
|
||
return None
|
||
except Exception as e:
|
||
logger.error("LDAP 連線錯誤", exc_info=True) # 不記錄詳細錯誤
|
||
return None
|
||
|
||
|
||
class TokenData:
|
||
"""Token 資料結構"""
|
||
def __init__(self, user_id: int, username: str, role: str):
|
||
self.user_id = user_id
|
||
self.username = username
|
||
self.role = role
|