This commit is contained in:
beabigegg
2025-08-28 08:59:46 +08:00
parent b9557250a4
commit 4f7f46b07a
42 changed files with 4992 additions and 494 deletions

View File

@@ -1,76 +1,96 @@
from flask import Blueprint, render_template, request, redirect, url_for, flash
from flask_login import login_required, current_user
from werkzeug.security import generate_password_hash
from models import User, db
from utils import admin_required
admin_bp = Blueprint('admin', __name__, url_prefix='/admin')
@admin_bp.before_request
@admin_bp.route('/users')
@login_required
@admin_required
def before_request():
"""在處理此藍圖中的任何請求之前,確保使用者是已登入的管理員。"""
pass
@admin_bp.route('/users')
def user_list():
users = User.query.all()
"""顯示所有使用者列表,供管理員管理權限。"""
# MySQL 不支援 nullslast(),改用 COALESCE 處理 NULL 值
users = User.query.order_by(User.last_login.desc(), User.username).all()
return render_template('user_management.html', users=users)
@admin_bp.route('/users/create', methods=['POST'])
def create_user():
username = request.form.get('username')
password = request.form.get('password')
role = request.form.get('role')
if not all([username, password, role]):
flash('所有欄位都是必填的!', 'danger')
return redirect(url_for('admin.user_list'))
if User.query.filter_by(username=username).first():
flash('該使用者名稱已存在!', 'danger')
return redirect(url_for('admin.user_list'))
new_user = User(
username=username,
password_hash=generate_password_hash(password),
role=role
)
db.session.add(new_user)
db.session.commit()
flash('新使用者已成功建立!', 'success')
return redirect(url_for('admin.user_list'))
@admin_bp.route('/users/edit/<int:user_id>', methods=['POST'])
def edit_user(user_id):
@login_required
@admin_required
def edit_user_role(user_id):
"""編輯使用者權限。僅允許修改角色。"""
user = User.query.get_or_404(user_id)
new_role = request.form.get('role')
new_password = request.form.get('password')
if new_role:
# 防止 admin 修改自己的角色,導致失去管理權限
if user.id == current_user.id and user.role == 'admin' and new_role != 'admin':
flash('無法變更自己的管理員角色!', 'danger')
return redirect(url_for('admin.user_list'))
user.role = new_role
if new_password:
user.password_hash = generate_password_hash(new_password)
if new_role not in ['viewer', 'editor', 'admin']:
flash('無效的權限設定!', 'danger')
return redirect(url_for('admin.user_list'))
# 防止管理員修改自己的角色導致失去管理權限
if user.id == current_user.id and user.role == 'admin' and new_role != 'admin':
flash('無法變更自己的管理員權限!', 'danger')
return redirect(url_for('admin.user_list'))
old_role = user.role
user.role = new_role
db.session.commit()
flash(f"使用者 '{user.username}' 的資料已更新。", 'success')
flash(f"使用者 '{user.username}' 的權限已從 '{old_role}' 更新為 '{new_role}'", 'success')
return redirect(url_for('admin.user_list'))
@admin_bp.route('/users/delete/<int:user_id>', methods=['POST'])
@login_required
@admin_required
def delete_user(user_id):
# 避免 admin 刪除自己
"""刪除使用者帳號。"""
# 避免管理員刪除自己
if user_id == current_user.id:
flash('無法刪除自己的帳號!', 'danger')
return redirect(url_for('admin.user_list'))
user = User.query.get_or_404(user_id)
username = user.username
# 檢查是否為最後一個管理員
admin_count = User.query.filter_by(role='admin').count()
if user.role == 'admin' and admin_count <= 1:
flash('無法刪除最後一個管理員帳號!', 'danger')
return redirect(url_for('admin.user_list'))
db.session.delete(user)
db.session.commit()
flash(f"使用者 '{user.username}' 已被刪除。", 'success')
flash(f"使用者 '{username}' 已被刪除。", 'success')
return redirect(url_for('admin.user_list'))
@admin_bp.route('/users/set-admin', methods=['POST'])
@login_required
@admin_required
def set_admin():
"""設定特定AD帳號為管理員權限。"""
username = request.form.get('username', '').strip()
if not username:
flash('請輸入有效的AD帳號', 'danger')
return redirect(url_for('admin.user_list'))
# 查找或建立使用者
user = User.query.filter_by(username=username).first()
if not user:
# 建立新的使用者記錄
user = User(
username=username,
password_hash='ldap_authenticated', # LDAP使用者不需要本地密碼
role='admin'
)
db.session.add(user)
db.session.commit()
flash(f"已為 AD 帳號 '{username}' 建立管理員權限。", 'success')
else:
# 更新現有使用者權限
old_role = user.role
user.role = 'admin'
db.session.commit()
flash(f"已將 '{username}' 的權限從 '{old_role}' 更新為 'admin'", 'success')
return redirect(url_for('admin.user_list'))

80
routes/api.py Normal file
View File

@@ -0,0 +1,80 @@
from flask import Blueprint, request, jsonify
from flask_login import login_required
from ldap_utils import search_ldap_principals, search_ldap_groups
api_bp = Blueprint('api', __name__, url_prefix='/api')
@api_bp.route('/ldap-search', methods=['GET'])
@login_required
def ldap_search():
"""
API endpoint for searching LDAP principals.
Returns JSON data for Tom Select dropdown.
"""
search_term = request.args.get('q', '').strip()
print(f"[DEBUG] LDAP API 搜尋請求: '{search_term}'")
if not search_term or len(search_term) < 2:
print(f"[DEBUG] 搜尋詞太短,返回空結果")
return jsonify([])
try:
print(f"[DEBUG] 開始 LDAP 搜尋: '{search_term}'")
# 搜尋使用者
user_results = search_ldap_principals(search_term, limit=15)
print(f"[DEBUG] LDAP 使用者搜尋結果數量: {len(user_results)}")
# 搜尋群組
group_results = search_ldap_groups(search_term, limit=5)
print(f"[DEBUG] LDAP 群組搜尋結果數量: {len(group_results)}")
# Format results for Tom Select
formatted_results = []
# 加入使用者結果
for result in user_results:
formatted_result = {
'value': result['email'],
'text': f"👤 {result['name']} ({result['email']})",
'type': 'user'
}
formatted_results.append(formatted_result)
print(f"[DEBUG] 格式化使用者結果: {result['name']} - {result['email']}")
# 加入群組結果
for result in group_results:
# 對群組,使用群組名稱作為 value在發送郵件時再展開成員
formatted_result = {
'value': f"group:{result['name']}", # 特殊前綴表示這是群組
'text': f"👥 {result['name']} ({result['member_count']} 成員)",
'type': 'group'
}
formatted_results.append(formatted_result)
print(f"[DEBUG] 格式化群組結果: {result['name']} - {result['member_count']} 成員")
print(f"[DEBUG] 返回 {len(formatted_results)} 個搜尋結果")
return jsonify(formatted_results)
except Exception as e:
print(f"[DEBUG] LDAP search API error: {e}")
import traceback
traceback.print_exc()
return jsonify({'error': 'Search failed'}), 500
@api_bp.route('/debug-form', methods=['POST'])
@login_required
def debug_form():
"""接收前端表單除錯資訊"""
try:
data = request.get_json()
print(f"[FRONTEND DEBUG] 收到前端除錯資料:")
print(f"[FRONTEND DEBUG] selectedValues: {data.get('selectedValues')}")
print(f"[FRONTEND DEBUG] recipientValue: {data.get('recipientValue')}")
print(f"[FRONTEND DEBUG] hiddenFieldValue: {data.get('hiddenFieldValue')}")
print(f"[FRONTEND DEBUG] tomSelectValue: {data.get('tomSelectValue')}")
return jsonify({'status': 'received'})
except Exception as e:
print(f"[FRONTEND DEBUG] 錯誤: {e}")
return jsonify({'error': str(e)}), 400

View File

@@ -1,11 +1,9 @@
from flask import Blueprint, render_template, request, redirect, url_for, flash
from flask import Blueprint, render_template, request, redirect, url_for, flash, current_app
from flask_login import login_user, logout_user, login_required, current_user
from werkzeug.security import check_password_hash
from ldap_utils import authenticate_ldap_user
from models import User, db
from datetime import datetime
from werkzeug.security import check_password_hash
from ldap_utils import authenticate_ldap_user, generate_password_hash
import logging
auth_bp = Blueprint('auth', __name__)
@@ -15,35 +13,82 @@ def login():
return redirect(url_for('temp_spec.spec_list'))
if request.method == 'POST':
username = request.form['username']
username = request.form['username'].strip()
password = request.form['password']
# Step 1: Authenticate against LDAP
user_info = authenticate_ldap_user(username, password)
if user_info:
# Step 2: User authenticated successfully, find or create local user
local_user = User.query.filter_by(username=user_info['username']).first()
if not local_user:
# Create a new user in the local database
local_user = User(
username=user_info['username'],
# password_hash is no longer needed for login, can be empty or random
password_hash='ldap_authenticated',
role='viewer' # Default role for new users
)
db.session.add(local_user)
# 記錄登入嘗試
print(f"[DEBUG] 登入嘗試 - 帳號: {username}")
current_app.logger.info(f"Login attempt for user: {username}")
# 驗證帳號格式
if '@' not in username:
print(f"[DEBUG] 帳號格式錯誤 - 缺少 @ 符號: {username}")
current_app.logger.warning(f"Invalid username format (missing @): {username}")
flash('請使用完整的 AD 帳號格式 (包含 @domain)', 'warning')
return render_template('login.html')
try:
# Step 1: Authenticate against LDAP
print(f"[DEBUG] 準備進行 LDAP 驗證: {username}")
current_app.logger.info(f"Attempting LDAP authentication for: {username}")
# Update last_login time
local_user.last_login = datetime.now()
db.session.commit()
user_info = authenticate_ldap_user(username, password)
print(f"[DEBUG] LDAP 驗證結果: {user_info}")
# Step 3: Log in the user with Flask-Login
login_user(local_user)
return redirect(url_for('temp_spec.spec_list'))
else:
flash('帳號或密碼錯誤,請重新輸入', 'danger')
if user_info:
print(f"[DEBUG] LDAP 驗證成功: {username}")
current_app.logger.info(f"LDAP authentication successful for: {username}")
# Step 2: User authenticated successfully, find or create local user
local_user = User.query.filter_by(username=user_info['username']).first()
if not local_user:
print(f"[DEBUG] 建立新的本地使用者帳號: {user_info['username']}")
current_app.logger.info(f"Creating new local user account: {user_info['username']}")
# Create a new user in the local database
# 預設權限為 viewer特殊帳號設為 admin
default_role = 'viewer' # 預設權限
# 特殊處理:設定特定帳號為管理員權限
if user_info['username'].lower() == 'ymirliu@panjit.com.tw':
default_role = 'admin'
print(f"[DEBUG] 特殊帳號:{user_info['username']} 設定為管理員權限")
local_user = User(
username=user_info['username'],
# password_hash is no longer needed for login, can be empty or random
password_hash='ldap_authenticated',
role=default_role
)
db.session.add(local_user)
print(f"[DEBUG] 新使用者建立完成,權限: {default_role}")
current_app.logger.info(f"New user created with role: {default_role}")
else:
print(f"[DEBUG] 找到現有使用者: {user_info['username']}")
current_app.logger.info(f"Existing user found: {user_info['username']}")
# Update last_login time
local_user.last_login = datetime.now()
db.session.commit()
# Step 3: Log in the user with Flask-Login
login_user(local_user)
print(f"[DEBUG] 使用者登入成功: {username}")
current_app.logger.info(f"User successfully logged in: {username}")
return redirect(url_for('temp_spec.spec_list'))
else:
# LDAP 驗證失敗
print(f"[DEBUG] LDAP 驗證失敗: {username}")
current_app.logger.warning(f"LDAP authentication failed for: {username}")
flash('AD帳號或密碼錯誤請檢查後重新輸入', 'danger')
except Exception as e:
# 系統錯誤
print(f"[DEBUG] 系統錯誤: {str(e)}")
current_app.logger.error(f"Login system error for user {username}: {str(e)}")
flash('系統登入發生錯誤,請稍後再試或聯繫系統管理員', 'danger')
return render_template('login.html')

View File

@@ -3,7 +3,7 @@ from flask import Blueprint, render_template, request, redirect, url_for, flash,
from flask_login import login_required, current_user
from datetime import datetime, timedelta
from models import TempSpec, db, Upload, SpecHistory
from utils import editor_or_admin_required, add_history_log, admin_required, send_email
from utils import editor_or_admin_required, add_history_log, admin_required, send_email, process_recipients
from ldap_utils import get_ldap_group_members
import os
import shutil
@@ -269,30 +269,29 @@ def activate_spec(spec_id):
db.session.commit()
flash(f"規範 '{spec.spec_code}' 已生效!", 'success')
# --- Start of Email Notification Example ---
# Get recipient list from a predefined LDAP group
# NOTE: 'TempSpec_Approvers' is an example group name. Replace with the actual group name.
recipients = get_ldap_group_members('TempSpec_Approvers')
if recipients:
subject = f"[暫規通知] 規範 '{spec.spec_code}' 已正式生效"
# Using f-strings and triple quotes for a readable HTML body
body = f"""
<html>
<body>
<p>您好,</p>
<p>暫時規範 <b>{spec.spec_code} - {spec.title}</b> 已由管理員啟用,並正式生效。</p>
<p>詳細資訊請登入系統查看。</p>
<p>生效日期: {spec.start_date.strftime('%Y-%m-%d')}<br>
結束日期: {spec.end_date.strftime('%Y-%m-%d')}</p>
<p>此為系統自動發送的通知郵件,請勿直接回覆。</p>
</body>
</html>
"""
send_email(recipients, subject, body)
else:
# Log a warning if no recipients were found, but don't block the main process
current_app.logger.warning(f"Could not find recipients in LDAP group 'TempSpec_Approvers' for spec {spec.id}.")
# --- End of Email Notification Example ---
# --- Start of Dynamic Email Notification ---
recipients_str = request.form.get('recipients')
if recipients_str:
recipients = process_recipients(recipients_str)
if recipients:
subject = f"[暫規通知] 規範 '{spec.spec_code}' 已正式生效"
# Using f-strings and triple quotes for a readable HTML body
body = f"""
<html>
<body>
<p>您好,</p>
<p>暫時規範 <b>{spec.spec_code} - {spec.title}</b> 已由管理員啟用,並正式生效。</p>
<p>詳細資訊請登入系統查看。</p>
<p>生效日期: {spec.start_date.strftime('%Y-%m-%d')}<br>
結束日期: {spec.end_date.strftime('%Y-%m-%d')}</p>
<p>申請人: {spec.applicant}</p>
<p>此為系統自動發送的通知郵件,請勿直接回覆。</p>
</body>
</html>
"""
send_email(recipients, subject, body)
current_app.logger.info(f"Sent activation notification for spec {spec.spec_code} to {len(recipients)} recipients.")
# --- End of Dynamic Email Notification ---
return redirect(url_for('temp_spec.spec_list'))
return render_template('activate_spec.html', spec=spec)
@@ -311,6 +310,30 @@ def terminate_spec(spec_id):
spec.termination_reason = reason
spec.end_date = datetime.today().date()
add_history_log(spec.id, '終止', f"原因: {reason}")
# --- Start of Dynamic Email Notification ---
recipients_str = request.form.get('recipients')
if recipients_str:
recipients = process_recipients(recipients_str)
if recipients:
subject = f"[暫規通知] 規範 '{spec.spec_code}' 已提早終止"
body = f"""
<html>
<body>
<p>您好,</p>
<p>暫時規範 <b>{spec.spec_code} - {spec.title}</b> 已被提早終止。</p>
<p>終止日期: <b>{spec.end_date.strftime('%Y-%m-%d')}</b></p>
<p>申請人: {spec.applicant}</p>
<p>終止原因: {reason}</p>
<p>詳細資訊請登入系統查看。</p>
<p>此為系統自動發送的通知郵件,請勿直接回覆。</p>
</body>
</html>
"""
send_email(recipients, subject, body)
current_app.logger.info(f"Sent termination notification for spec {spec.spec_code} to {len(recipients)} recipients.")
# --- End of Dynamic Email Notification ---
db.session.commit()
flash(f"規範 '{spec.spec_code}' 已被提早終止。", 'warning')
return redirect(url_for('temp_spec.spec_list'))
@@ -384,6 +407,28 @@ def extend_spec(spec_id):
details += f",並上傳新檔案 '{new_upload.filename}'"
add_history_log(spec.id, '展延', details)
# --- Start of Dynamic Email Notification ---
recipients_str = request.form.get('recipients')
if recipients_str:
recipients = process_recipients(recipients_str)
if recipients:
subject = f"[暫規通知] 規範 '{spec.spec_code}' 已展延"
body = f"""
<html>
<body>
<p>您好,</p>
<p>暫時規範 <b>{spec.spec_code} - {spec.title}</b> 已成功展延。</p>
<p>新的結束日期為: <b>{spec.end_date.strftime('%Y-%m-%d')}</b></p>
<p>申請人: {spec.applicant}</p>
<p>詳細資訊請登入系統查看。</p>
<p>此為系統自動發送的通知郵件,請勿直接回覆。</p>
</body>
</html>
"""
send_email(recipients, subject, body)
current_app.logger.info(f"Sent extension notification for spec {spec.spec_code} to {len(recipients)} recipients.")
# --- End of Dynamic Email Notification ---
db.session.commit()
flash(f"規範 '{spec.spec_code}' 已成功展延!", 'success')
return redirect(url_for('temp_spec.spec_list'))