Initial commit: Daily News App
企業內部新聞彙整與分析系統 - 自動新聞抓取 (Digitimes, 經濟日報, 工商時報) - AI 智慧摘要 (OpenAI/Claude/Ollama) - 群組管理與訂閱通知 - 已清理 Python 快取檔案 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
118
app/core/security.py
Normal file
118
app/core/security.py
Normal file
@@ -0,0 +1,118 @@
|
||||
"""
|
||||
安全認證模組
|
||||
處理密碼雜湊、JWT Token、LDAP 認證
|
||||
"""
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Optional, Any
|
||||
from jose import JWTError, jwt
|
||||
import bcrypt
|
||||
from ldap3 import Server, Connection, ALL, NTLM
|
||||
from ldap3.core.exceptions import LDAPException
|
||||
from ldap3.utils.conv import escape_filter_chars
|
||||
import logging
|
||||
|
||||
from app.core.config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def verify_password(plain_password: str, hashed_password: str) -> bool:
|
||||
"""驗證密碼"""
|
||||
return bcrypt.checkpw(
|
||||
plain_password.encode('utf-8'),
|
||||
hashed_password.encode('utf-8')
|
||||
)
|
||||
|
||||
|
||||
def get_password_hash(password: str) -> str:
|
||||
"""產生密碼雜湊"""
|
||||
return bcrypt.hashpw(
|
||||
password.encode('utf-8'),
|
||||
bcrypt.gensalt()
|
||||
).decode('utf-8')
|
||||
|
||||
|
||||
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
|
||||
"""建立 JWT Access Token"""
|
||||
to_encode = data.copy()
|
||||
if expires_delta:
|
||||
expire = datetime.utcnow() + expires_delta
|
||||
else:
|
||||
expire = datetime.utcnow() + timedelta(minutes=settings.jwt_access_token_expire_minutes)
|
||||
to_encode.update({"exp": expire})
|
||||
encoded_jwt = jwt.encode(to_encode, settings.jwt_secret_key, algorithm=settings.jwt_algorithm)
|
||||
return encoded_jwt
|
||||
|
||||
|
||||
def decode_access_token(token: str) -> Optional[dict]:
|
||||
"""解碼 JWT Access Token"""
|
||||
try:
|
||||
payload = jwt.decode(token, settings.jwt_secret_key, algorithms=[settings.jwt_algorithm])
|
||||
return payload
|
||||
except JWTError:
|
||||
return None
|
||||
|
||||
|
||||
def verify_ldap_credentials(username: str, password: str) -> Optional[dict]:
|
||||
"""
|
||||
驗證 LDAP/AD 憑證
|
||||
|
||||
Returns:
|
||||
成功時返回用戶資訊 dict,失敗返回 None
|
||||
"""
|
||||
if not settings.ldap_server:
|
||||
return None
|
||||
|
||||
try:
|
||||
server = Server(settings.ldap_server, port=settings.ldap_port, get_info=ALL)
|
||||
|
||||
# 嘗試綁定(使用 NTLM 或簡單綁定)
|
||||
user_dn = f"{username}@{settings.ldap_base_dn.replace('DC=', '').replace(',', '.')}"
|
||||
|
||||
conn = Connection(
|
||||
server,
|
||||
user=user_dn,
|
||||
password=password,
|
||||
authentication=NTLM,
|
||||
auto_bind=True
|
||||
)
|
||||
|
||||
if conn.bound:
|
||||
# 查詢用戶資訊
|
||||
# 轉義特殊字元,防止 LDAP 注入
|
||||
safe_username = escape_filter_chars(username)
|
||||
search_filter = f"(sAMAccountName={safe_username})"
|
||||
conn.search(
|
||||
settings.ldap_base_dn,
|
||||
search_filter,
|
||||
attributes=['displayName', 'mail', 'department']
|
||||
)
|
||||
|
||||
if conn.entries:
|
||||
entry = conn.entries[0]
|
||||
return {
|
||||
"username": username,
|
||||
"display_name": str(entry.displayName) if hasattr(entry, 'displayName') else username,
|
||||
"email": str(entry.mail) if hasattr(entry, 'mail') else None,
|
||||
"department": str(entry.department) if hasattr(entry, 'department') else None
|
||||
}
|
||||
|
||||
conn.unbind()
|
||||
return {"username": username, "display_name": username}
|
||||
|
||||
return None
|
||||
|
||||
except LDAPException as e:
|
||||
logger.error("LDAP 認證失敗", exc_info=True) # 不記錄詳細錯誤
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error("LDAP 連線錯誤", exc_info=True) # 不記錄詳細錯誤
|
||||
return None
|
||||
|
||||
|
||||
class TokenData:
|
||||
"""Token 資料結構"""
|
||||
def __init__(self, user_id: int, username: str, role: str):
|
||||
self.user_id = user_id
|
||||
self.username = username
|
||||
self.role = role
|
||||
Reference in New Issue
Block a user