This commit is contained in:
beabigegg
2025-09-12 08:00:56 +08:00
commit a408ce402d
54 changed files with 5626 additions and 0 deletions

0
routes/__init__.py Normal file
View File

96
routes/admin.py Normal file
View File

@@ -0,0 +1,96 @@
from flask import Blueprint, render_template, request, redirect, url_for, flash
from flask_login import login_required, current_user
from models import User, db
from utils import admin_required
admin_bp = Blueprint('admin', __name__, url_prefix='/admin')
@admin_bp.route('/users')
@login_required
@admin_required
def user_list():
"""顯示所有使用者列表,供管理員管理權限。"""
# MySQL 不支援 nullslast(),改用 COALESCE 處理 NULL 值
users = User.query.order_by(User.last_login.desc(), User.username).all()
return render_template('user_management.html', users=users)
@admin_bp.route('/users/edit/<int:user_id>', methods=['POST'])
@login_required
@admin_required
def edit_user_role(user_id):
"""編輯使用者權限。僅允許修改角色。"""
user = User.query.get_or_404(user_id)
new_role = request.form.get('role')
if new_role not in ['viewer', 'editor', 'admin']:
flash('無效的權限設定!', 'danger')
return redirect(url_for('admin.user_list'))
# 防止管理員修改自己的角色導致失去管理權限
if user.id == current_user.id and user.role == 'admin' and new_role != 'admin':
flash('無法變更自己的管理員權限!', 'danger')
return redirect(url_for('admin.user_list'))
old_role = user.role
user.role = new_role
db.session.commit()
flash(f"使用者 '{user.username}' 的權限已從 '{old_role}' 更新為 '{new_role}'", 'success')
return redirect(url_for('admin.user_list'))
@admin_bp.route('/users/delete/<int:user_id>', methods=['POST'])
@login_required
@admin_required
def delete_user(user_id):
"""刪除使用者帳號。"""
# 避免管理員刪除自己
if user_id == current_user.id:
flash('無法刪除自己的帳號!', 'danger')
return redirect(url_for('admin.user_list'))
user = User.query.get_or_404(user_id)
username = user.username
# 檢查是否為最後一個管理員
admin_count = User.query.filter_by(role='admin').count()
if user.role == 'admin' and admin_count <= 1:
flash('無法刪除最後一個管理員帳號!', 'danger')
return redirect(url_for('admin.user_list'))
db.session.delete(user)
db.session.commit()
flash(f"使用者 '{username}' 已被刪除。", 'success')
return redirect(url_for('admin.user_list'))
@admin_bp.route('/users/set-admin', methods=['POST'])
@login_required
@admin_required
def set_admin():
"""設定特定AD帳號為管理員權限。"""
username = request.form.get('username', '').strip()
if not username:
flash('請輸入有效的AD帳號', 'danger')
return redirect(url_for('admin.user_list'))
# 查找或建立使用者
user = User.query.filter_by(username=username).first()
if not user:
# 建立新的使用者記錄
user = User(
username=username,
password_hash='ldap_authenticated', # LDAP使用者不需要本地密碼
role='admin'
)
db.session.add(user)
db.session.commit()
flash(f"已為 AD 帳號 '{username}' 建立管理員權限。", 'success')
else:
# 更新現有使用者權限
old_role = user.role
user.role = 'admin'
db.session.commit()
flash(f"已將 '{username}' 的權限從 '{old_role}' 更新為 'admin'", 'success')
return redirect(url_for('admin.user_list'))

80
routes/api.py Normal file
View File

@@ -0,0 +1,80 @@
from flask import Blueprint, request, jsonify
from flask_login import login_required
from ldap_utils import search_ldap_principals, search_ldap_groups
api_bp = Blueprint('api', __name__, url_prefix='/api')
@api_bp.route('/ldap-search', methods=['GET'])
@login_required
def ldap_search():
"""
API endpoint for searching LDAP principals.
Returns JSON data for Tom Select dropdown.
"""
search_term = request.args.get('q', '').strip()
print(f"[DEBUG] LDAP API 搜尋請求: '{search_term}'")
if not search_term or len(search_term) < 2:
print(f"[DEBUG] 搜尋詞太短,返回空結果")
return jsonify([])
try:
print(f"[DEBUG] 開始 LDAP 搜尋: '{search_term}'")
# 搜尋使用者
user_results = search_ldap_principals(search_term, limit=15)
print(f"[DEBUG] LDAP 使用者搜尋結果數量: {len(user_results)}")
# 搜尋群組
group_results = search_ldap_groups(search_term, limit=5)
print(f"[DEBUG] LDAP 群組搜尋結果數量: {len(group_results)}")
# Format results for Tom Select
formatted_results = []
# 加入使用者結果
for result in user_results:
formatted_result = {
'value': result['email'],
'text': f"👤 {result['name']} ({result['email']})",
'type': 'user'
}
formatted_results.append(formatted_result)
print(f"[DEBUG] 格式化使用者結果: {result['name']} - {result['email']}")
# 加入群組結果
for result in group_results:
# 對群組,使用群組名稱作為 value在發送郵件時再展開成員
formatted_result = {
'value': f"group:{result['name']}", # 特殊前綴表示這是群組
'text': f"👥 {result['name']} ({result['member_count']} 成員)",
'type': 'group'
}
formatted_results.append(formatted_result)
print(f"[DEBUG] 格式化群組結果: {result['name']} - {result['member_count']} 成員")
print(f"[DEBUG] 返回 {len(formatted_results)} 個搜尋結果")
return jsonify(formatted_results)
except Exception as e:
print(f"[DEBUG] LDAP search API error: {e}")
import traceback
traceback.print_exc()
return jsonify({'error': 'Search failed'}), 500
@api_bp.route('/debug-form', methods=['POST'])
@login_required
def debug_form():
"""接收前端表單除錯資訊"""
try:
data = request.get_json()
print(f"[FRONTEND DEBUG] 收到前端除錯資料:")
print(f"[FRONTEND DEBUG] selectedValues: {data.get('selectedValues')}")
print(f"[FRONTEND DEBUG] recipientValue: {data.get('recipientValue')}")
print(f"[FRONTEND DEBUG] hiddenFieldValue: {data.get('hiddenFieldValue')}")
print(f"[FRONTEND DEBUG] tomSelectValue: {data.get('tomSelectValue')}")
return jsonify({'status': 'received'})
except Exception as e:
print(f"[FRONTEND DEBUG] 錯誤: {e}")
return jsonify({'error': str(e)}), 400

99
routes/auth.py Normal file
View File

@@ -0,0 +1,99 @@
from flask import Blueprint, render_template, request, redirect, url_for, flash, current_app
from flask_login import login_user, logout_user, login_required, current_user
from ldap_utils import authenticate_ldap_user
from models import User, db
from datetime import datetime
import logging
auth_bp = Blueprint('auth', __name__)
@auth_bp.route('/login', methods=['GET', 'POST'])
def login():
if current_user.is_authenticated:
return redirect(url_for('temp_spec.spec_list'))
if request.method == 'POST':
username = request.form['username'].strip()
password = request.form['password']
# 記錄登入嘗試
print(f"[DEBUG] 登入嘗試 - 帳號: {username}")
current_app.logger.info(f"Login attempt for user: {username}")
# 驗證帳號格式
if '@' not in username:
print(f"[DEBUG] 帳號格式錯誤 - 缺少 @ 符號: {username}")
current_app.logger.warning(f"Invalid username format (missing @): {username}")
flash('請使用完整的 AD 帳號格式 (包含 @domain)', 'warning')
return render_template('login.html')
try:
# Step 1: Authenticate against LDAP
print(f"[DEBUG] 準備進行 LDAP 驗證: {username}")
current_app.logger.info(f"Attempting LDAP authentication for: {username}")
user_info = authenticate_ldap_user(username, password)
print(f"[DEBUG] LDAP 驗證結果: {user_info}")
if user_info:
print(f"[DEBUG] LDAP 驗證成功: {username}")
current_app.logger.info(f"LDAP authentication successful for: {username}")
# Step 2: User authenticated successfully, find or create local user
local_user = User.query.filter_by(username=user_info['username']).first()
if not local_user:
print(f"[DEBUG] 建立新的本地使用者帳號: {user_info['username']}")
current_app.logger.info(f"Creating new local user account: {user_info['username']}")
# Create a new user in the local database
# 預設權限為 viewer特殊帳號設為 admin
default_role = 'viewer' # 預設權限
# 特殊處理:設定特定帳號為管理員權限
if user_info['username'].lower() == 'ymirliu@panjit.com.tw':
default_role = 'admin'
print(f"[DEBUG] 特殊帳號:{user_info['username']} 設定為管理員權限")
local_user = User(
username=user_info['username'],
# password_hash is no longer needed for login, can be empty or random
password_hash='ldap_authenticated',
role=default_role
)
db.session.add(local_user)
print(f"[DEBUG] 新使用者建立完成,權限: {default_role}")
current_app.logger.info(f"New user created with role: {default_role}")
else:
print(f"[DEBUG] 找到現有使用者: {user_info['username']}")
current_app.logger.info(f"Existing user found: {user_info['username']}")
# Update last_login time
local_user.last_login = datetime.now()
db.session.commit()
# Step 3: Log in the user with Flask-Login
login_user(local_user)
print(f"[DEBUG] 使用者登入成功: {username}")
current_app.logger.info(f"User successfully logged in: {username}")
return redirect(url_for('temp_spec.spec_list'))
else:
# LDAP 驗證失敗
print(f"[DEBUG] LDAP 驗證失敗: {username}")
current_app.logger.warning(f"LDAP authentication failed for: {username}")
flash('AD帳號或密碼錯誤請檢查後重新輸入', 'danger')
except Exception as e:
# 系統錯誤
print(f"[DEBUG] 系統錯誤: {str(e)}")
current_app.logger.error(f"Login system error for user {username}: {str(e)}")
flash('系統登入發生錯誤,請稍後再試或聯繫系統管理員', 'danger')
return render_template('login.html')
@auth_bp.route('/logout')
@login_required
def logout():
logout_user()
return redirect(url_for('auth.login'))

488
routes/temp_spec.py Normal file
View File

@@ -0,0 +1,488 @@
# -*- coding: utf-8 -*-
from flask import Blueprint, render_template, request, redirect, url_for, flash, send_file, current_app, jsonify, abort
from flask_login import login_required, current_user
from datetime import datetime, timedelta
from models import TempSpec, db, Upload, SpecHistory
from utils import editor_or_admin_required, add_history_log, admin_required, send_email, process_recipients
from ldap_utils import get_ldap_group_members
import os
import shutil
import jwt
import requests
from werkzeug.utils import secure_filename
from docxtpl import DocxTemplate
temp_spec_bp = Blueprint('temp_spec', __name__)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# 移除 @login_required
@temp_spec_bp.before_request
def before_request():
"""在處理此藍圖中的任何請求之前,確保使用者已登入。"""
# 移除全域的登入驗證,改為在各個路由單獨設定
pass
def _generate_next_spec_code():
"""
產生下一個暫時規範編號。
規則: PE + 民國年(3碼) + 月份(2碼) + 流水號(2碼)
"""
now = datetime.now()
roc_year = now.year - 1911
prefix = f"PE{roc_year}{now.strftime('%m')}"
latest_spec = TempSpec.query.filter(
TempSpec.spec_code.startswith(prefix)
).order_by(TempSpec.spec_code.desc()).first()
if latest_spec:
last_seq = int(latest_spec.spec_code[-2:])
new_seq = last_seq + 1
else:
new_seq = 1
return f"{prefix}{new_seq:02d}"
def get_file_uri(filename):
"""產生 Flask 應用程式可以存取到該檔案的 URL"""
return url_for('static', filename=f"generated/{filename}", _external=True)
@temp_spec_bp.route('/create', methods=['GET', 'POST'])
@editor_or_admin_required
def create_temp_spec():
if request.method == 'POST':
spec_code = _generate_next_spec_code()
form_data = request.form
now = datetime.now()
# 1. 在資料庫中建立紀錄
spec = TempSpec(
spec_code=spec_code,
title=form_data['theme'],
applicant=form_data['applicant'],
status='pending_approval',
created_at=now,
start_date=now.date(),
end_date=(now + timedelta(days=30)).date()
)
db.session.add(spec)
db.session.flush()
# 2. 準備要填入 Word 範本的 context
context = {
'serial_number': spec_code,
'theme': form_data.get('theme', ''),
'package': form_data.get('package', ''),
'lot_number': form_data.get('lot_number', ''),
'equipment_type': form_data.get('equipment_type', ''),
'applicant': form_data.get('applicant', ''),
'applicant_phone': form_data.get('applicant_phone', ''),
'start_date': now.strftime('%Y-%m-%d'),
'end_date': (now + timedelta(days=30)).strftime('%Y-%m-%d'),
}
# 3. 處理勾選框邏輯
selected_stations = request.form.getlist('station')
station_keys = ['probing', 'dicing', 'diebond', 'wirebond', 'solder', 'molding',
'degate', 'deflash', 'plating', 'trimform', 'marking', 'tmtt', 'other']
for key in station_keys:
context[f's_{key}'] = '' if key in selected_stations else ''
selected_tccs_level = form_data.get('tccs_level')
level_keys = ['l1', 'l2', 'l3', 'l4']
for key in level_keys:
context[f't_{key}'] = '' if key == selected_tccs_level else ''
selected_tccs_4m = form_data.get('tccs_4m')
m_keys = ['man', 'machine', 'material', 'method', 'env']
for key in m_keys:
context[f't_{key}'] = '' if key == selected_tccs_4m else ''
# 4. 渲染 Word 範本
generated_folder = os.path.join(current_app.static_folder, 'generated')
os.makedirs(generated_folder, exist_ok=True)
template_path = os.path.join(BASE_DIR, 'template_with_placeholders.docx')
new_file_path = os.path.join(generated_folder, f"{spec_code}.docx")
if not os.path.exists(template_path):
flash('找不到 Word 範本檔案 (template_with_placeholders.docx)!', 'danger')
db.session.rollback()
return redirect(url_for('temp_spec.spec_list'))
doc = DocxTemplate(template_path)
doc.render(context)
doc.save(new_file_path)
# 5. 提交資料庫並重新導向
add_history_log(spec.id, '建立', f"建立暫時規範草稿: {spec.spec_code}")
db.session.commit()
return redirect(url_for('temp_spec.edit_spec', spec_id=spec.id))
# GET 請求:顯示建立表單
return render_template('create_temp_spec_form.html')
@temp_spec_bp.route('/edit/<int:spec_id>')
@editor_or_admin_required
def edit_spec(spec_id):
spec = TempSpec.query.get_or_404(spec_id)
doc_filename = f"{spec.spec_code}.docx"
doc_physical_path = os.path.join(current_app.static_folder, 'generated', doc_filename)
if not os.path.exists(doc_physical_path):
flash(f'找不到文件檔案: {doc_filename}', 'danger')
return redirect(url_for('temp_spec.spec_list'))
# --- START: 修正文件下載與回呼的 URL ---
# 1. 產生標準的文件 URL 和回呼 URL
doc_url = get_file_uri(doc_filename)
callback_url = url_for('temp_spec.onlyoffice_callback', spec_id=spec_id, _external=True)
# 2. 如果是在開發環境,將 URL 中的 localhost 替換為 Docker 可存取的地址
if '127.0.0.1' in doc_url or 'localhost' in doc_url:
# 同時修正 doc_url 和 callback_url
doc_url = doc_url.replace('127.0.0.1', 'host.docker.internal').replace('localhost', 'host.docker.internal')
callback_url = callback_url.replace('127.0.0.1', 'host.docker.internal').replace('localhost', 'host.docker.internal')
# --- END: 修正文件下載與回呼的 URL ---
oo_secret = current_app.config['ONLYOFFICE_JWT_SECRET']
payload = {
"document": {
"fileType": "docx",
"key": f"{spec.id}_{int(os.path.getmtime(doc_physical_path))}",
"title": doc_filename,
"url": doc_url # <-- 使用修正後的 doc_url
},
"documentType": "word",
"editorConfig": {
"callbackUrl": callback_url, # <-- 使用修正後的回呼 URL
"user": { "id": str(current_user.id), "name": current_user.username },
"customization": { "autosave": True, "forcesave": True }
}
}
token = jwt.encode(payload, oo_secret, algorithm='HS256')
config = payload.copy()
config['token'] = token
return render_template(
'onlyoffice_editor.html',
spec=spec,
config=config,
onlyoffice_url=current_app.config['ONLYOFFICE_URL']
)
# 這個路由不需要登入驗證,因為是 ONLYOFFICE Server 在呼叫它
@temp_spec_bp.route('/onlyoffice-callback/<int:spec_id>', methods=['POST'])
def onlyoffice_callback(spec_id):
data = request.json
if data.get('status') == 2:
try:
response = requests.get(data['url'], timeout=10)
response.raise_for_status()
spec = TempSpec.query.get_or_404(spec_id)
doc_filename = f"{spec.spec_code}.docx"
file_path = os.path.join(current_app.static_folder, 'generated', doc_filename)
with open(file_path, 'wb') as f:
f.write(response.content)
except Exception as e:
current_app.logger.error(f"ONLYOFFICE callback error for spec {spec_id}: {e}")
return jsonify({"error": 1, "message": str(e)})
return jsonify({"error": 0})
# --- 其他既有路由 ---
@temp_spec_bp.route('/list')
@login_required # 補上登入驗證
def spec_list():
page = request.args.get('page', 1, type=int)
query = request.args.get('query', '')
status_filter = request.args.get('status', '')
specs_query = TempSpec.query
if query:
search_term = f"%{query}%"
specs_query = specs_query.filter(
db.or_(
TempSpec.spec_code.ilike(search_term),
TempSpec.title.ilike(search_term)
)
)
if status_filter:
specs_query = specs_query.filter(TempSpec.status == status_filter)
pagination = specs_query.order_by(TempSpec.created_at.desc()).paginate(
page=page, per_page=15, error_out=False
)
specs = pagination.items
# --- START: 新增的程式碼 ---
# 取得今天的日期,並傳給模板
from datetime import date
today = date.today()
# --- END: 新增的程式碼 ---
return render_template(
'spec_list.html',
specs=specs,
pagination=pagination,
query=query,
status=status_filter,
today=today # <-- 將 today 傳遞到模板
)
@temp_spec_bp.route('/activate/<int:spec_id>', methods=['GET', 'POST'])
@admin_required
def activate_spec(spec_id):
spec = TempSpec.query.get_or_404(spec_id)
if request.method == 'POST':
uploaded_file = request.files.get('signed_file')
if not uploaded_file or uploaded_file.filename == '':
flash('您必須上傳一個檔案。', 'danger')
return redirect(url_for('temp_spec.activate_spec', spec_id=spec.id))
filename = secure_filename(f"{spec.spec_code}_signed_{datetime.now().strftime('%Y%m%d%H%M%S')}.pdf")
upload_folder = os.path.join(BASE_DIR, current_app.config['UPLOAD_FOLDER'])
os.makedirs(upload_folder, exist_ok=True)
file_path = os.path.join(upload_folder, filename)
uploaded_file.save(file_path)
new_upload = Upload(
temp_spec_id=spec.id,
filename=filename,
upload_time=datetime.now()
)
db.session.add(new_upload)
spec.status = 'active'
# 儲存通知郵件清單到資料庫
recipients_str = request.form.get('recipients')
if recipients_str:
spec.notification_emails = recipients_str.strip()
add_history_log(spec.id, '啟用', f"上傳已簽核檔案 '{filename}'")
db.session.commit()
flash(f"規範 '{spec.spec_code}' 已生效!", 'success')
# --- Start of Dynamic Email Notification ---
if recipients_str:
recipients = process_recipients(recipients_str)
if recipients:
subject = f"[暫規通知] 規範 '{spec.spec_code}' 已正式生效"
# Using f-strings and triple quotes for a readable HTML body
body = f"""
<html>
<body>
<p>您好,</p>
<p>暫時規範 <b>{spec.spec_code} - {spec.title}</b> 已由管理員啟用,並正式生效。</p>
<p>詳細資訊請登入系統查看。</p>
<p>生效日期: {spec.start_date.strftime('%Y-%m-%d')}<br>
結束日期: {spec.end_date.strftime('%Y-%m-%d')}</p>
<p>申請人: {spec.applicant}</p>
<p>此為系統自動發送的通知郵件,請勿直接回覆。</p>
</body>
</html>
"""
send_email(recipients, subject, body)
current_app.logger.info(f"Sent activation notification for spec {spec.spec_code} to {len(recipients)} recipients.")
# --- End of Dynamic Email Notification ---
return redirect(url_for('temp_spec.spec_list'))
return render_template('activate_spec.html', spec=spec)
@temp_spec_bp.route('/terminate/<int:spec_id>', methods=['GET', 'POST'])
@editor_or_admin_required
def terminate_spec(spec_id):
spec = TempSpec.query.get_or_404(spec_id)
if request.method == 'POST':
reason = request.form.get('reason')
if not reason:
flash('請填寫提早結束的原因。', 'danger')
return redirect(url_for('temp_spec.terminate_spec', spec_id=spec.id))
spec.status = 'terminated'
spec.termination_reason = reason
spec.end_date = datetime.today().date()
add_history_log(spec.id, '終止', f"原因: {reason}")
# --- Start of Dynamic Email Notification ---
# 優先使用表單提交的收件者,如果沒有則使用資料庫中儲存的
recipients_str = request.form.get('recipients')
if not recipients_str and spec.notification_emails:
recipients_str = spec.notification_emails
if recipients_str:
recipients = process_recipients(recipients_str)
if recipients:
subject = f"[暫規通知] 規範 '{spec.spec_code}' 已提早終止"
body = f"""
<html>
<body>
<p>您好,</p>
<p>暫時規範 <b>{spec.spec_code} - {spec.title}</b> 已被提早終止。</p>
<p>終止日期: <b>{spec.end_date.strftime('%Y-%m-%d')}</b></p>
<p>申請人: {spec.applicant}</p>
<p>終止原因: {reason}</p>
<p>詳細資訊請登入系統查看。</p>
<p>此為系統自動發送的通知郵件,請勿直接回覆。</p>
</body>
</html>
"""
send_email(recipients, subject, body)
current_app.logger.info(f"Sent termination notification for spec {spec.spec_code} to {len(recipients)} recipients.")
# --- End of Dynamic Email Notification ---
db.session.commit()
flash(f"規範 '{spec.spec_code}' 已被提早終止。", 'warning')
return redirect(url_for('temp_spec.spec_list'))
# 將儲存的郵件清單傳遞給模板
return render_template('terminate_spec.html', spec=spec, saved_emails=spec.notification_emails)
@temp_spec_bp.route('/download_initial_word/<int:spec_id>')
@login_required
def download_initial_word(spec_id):
spec = TempSpec.query.get_or_404(spec_id)
if current_user.role not in ['editor', 'admin']:
flash('權限不足,無法下載 Word 檔案。', 'danger')
abort(403)
generated_folder = os.path.join(current_app.static_folder, 'generated')
word_path = os.path.join(generated_folder, f"{spec.spec_code}.docx")
if not os.path.exists(word_path):
flash('找不到最初產生的 Word 檔案,可能已被刪除或移動。', 'danger')
return redirect(url_for('temp_spec.spec_list'))
return send_file(word_path, as_attachment=True)
@temp_spec_bp.route('/download_signed/<int:spec_id>')
@login_required # 補上登入驗證
def download_signed_pdf(spec_id):
latest_upload = Upload.query.filter_by(temp_spec_id=spec_id).order_by(Upload.upload_time.desc()).first()
if not latest_upload:
flash('找不到任何已上傳的簽核檔案。', 'danger')
return redirect(url_for('temp_spec.spec_list'))
upload_folder = os.path.join(BASE_DIR, current_app.config['UPLOAD_FOLDER'])
return send_file(os.path.join(upload_folder, latest_upload.filename), as_attachment=True)
@temp_spec_bp.route('/extend/<int:spec_id>', methods=['GET', 'POST'])
@editor_or_admin_required
def extend_spec(spec_id):
spec = TempSpec.query.get_or_404(spec_id)
if request.method == 'POST':
new_end_date_str = request.form.get('new_end_date')
uploaded_file = request.files.get('new_file')
if not uploaded_file or uploaded_file.filename == '':
flash('您必須上傳新的佐證檔案才能展延。', 'danger')
return redirect(url_for('temp_spec.extend_spec', spec_id=spec.id))
if not new_end_date_str:
flash('請選擇新的結束日期', 'danger')
return redirect(url_for('temp_spec.extend_spec', spec_id=spec.id))
spec.end_date = datetime.strptime(new_end_date_str, '%Y-%m-%d').date()
spec.extension_count += 1
spec.status = 'active'
filename = secure_filename(f"{spec.spec_code}_extension_{spec.extension_count}_{datetime.now().strftime('%Y%m%d')}.pdf")
upload_folder = os.path.join(BASE_DIR, current_app.config['UPLOAD_FOLDER'])
os.makedirs(upload_folder, exist_ok=True)
file_path = os.path.join(upload_folder, filename)
uploaded_file.save(file_path)
new_upload = Upload(
temp_spec_id=spec.id,
filename=filename,
upload_time=datetime.now()
)
db.session.add(new_upload)
details = f"展延結束日期至 {spec.end_date.strftime('%Y-%m-%d')}"
details += f",並上傳新檔案 '{new_upload.filename}'"
add_history_log(spec.id, '展延', details)
# --- Start of Dynamic Email Notification ---
# 優先使用表單提交的收件者,如果沒有則使用資料庫中儲存的
recipients_str = request.form.get('recipients')
if not recipients_str and spec.notification_emails:
recipients_str = spec.notification_emails
# 如果使用者有更新郵件清單,儲存回資料庫
if recipients_str:
spec.notification_emails = recipients_str.strip()
if recipients_str:
recipients = process_recipients(recipients_str)
if recipients:
subject = f"[暫規通知] 規範 '{spec.spec_code}' 已展延"
body = f"""
<html>
<body>
<p>您好,</p>
<p>暫時規範 <b>{spec.spec_code} - {spec.title}</b> 已成功展延。</p>
<p>新的結束日期為: <b>{spec.end_date.strftime('%Y-%m-%d')}</b></p>
<p>申請人: {spec.applicant}</p>
<p>詳細資訊請登入系統查看。</p>
<p>此為系統自動發送的通知郵件,請勿直接回覆。</p>
</body>
</html>
"""
send_email(recipients, subject, body)
current_app.logger.info(f"Sent extension notification for spec {spec.spec_code} to {len(recipients)} recipients.")
# --- End of Dynamic Email Notification ---
db.session.commit()
flash(f"規範 '{spec.spec_code}' 已成功展延!", 'success')
return redirect(url_for('temp_spec.spec_list'))
default_new_end_date = spec.end_date + timedelta(days=30)
# 將儲存的郵件清單傳遞給模板
return render_template('extend_spec.html', spec=spec, default_new_end_date=default_new_end_date, saved_emails=spec.notification_emails)
@temp_spec_bp.route('/history/<int:spec_id>')
@login_required # 補上登入驗證
def spec_history(spec_id):
spec = TempSpec.query.get_or_404(spec_id)
history = SpecHistory.query.filter_by(spec_id=spec_id).order_by(SpecHistory.timestamp.desc()).all()
return render_template('spec_history.html', spec=spec, history=history)
@temp_spec_bp.route('/delete/<int:spec_id>', methods=['POST'])
@admin_required
def delete_spec(spec_id):
spec = TempSpec.query.get_or_404(spec_id)
spec_code = spec.spec_code
files_to_delete = []
generated_folder = os.path.join(current_app.static_folder, 'generated')
files_to_delete.append(os.path.join(generated_folder, f"{spec.spec_code}.docx"))
upload_folder = os.path.join(BASE_DIR, current_app.config['UPLOAD_FOLDER'])
for upload_record in spec.uploads:
files_to_delete.append(os.path.join(upload_folder, upload_record.filename))
for f_path in files_to_delete:
try:
if os.path.exists(f_path):
os.remove(f_path)
except Exception as e:
current_app.logger.error(f"刪除檔案失敗: {f_path}, 原因: {e}")
db.session.delete(spec)
db.session.commit()
flash(f"規範 '{spec_code}' 及其所有相關檔案已成功刪除。", 'success')
return redirect(url_for('temp_spec.spec_list'))

29
routes/upload.py Normal file
View File

@@ -0,0 +1,29 @@
from flask import Blueprint, request, jsonify, current_app
from werkzeug.utils import secure_filename
import os
import time
upload_bp = Blueprint('upload', __name__)
@upload_bp.route('/image', methods=['POST'])
def upload_image():
file = request.files.get('file')
if not file:
return jsonify({'error': 'No file part'}), 400
# 建立一個獨特的檔名
extension = os.path.splitext(file.filename)[1]
filename = f"{int(time.time())}_{secure_filename(file.filename)}"
# 確保上傳資料夾存在
# 為了讓圖片能被網頁存取,我們將它存在 static 資料夾下
image_folder = os.path.join(current_app.static_folder, 'uploads', 'images')
os.makedirs(image_folder, exist_ok=True)
file_path = os.path.join(image_folder, filename)
file.save(file_path)
# 回傳 TinyMCE 需要的 JSON 格式
# 路徑必須是相對於網域根目錄的 URL
location = f"/static/uploads/images/{filename}"
return jsonify({'location': location})