Files
TEMP_spec_system/routes/temp_spec.py
2025-09-21 11:37:39 +08:00

562 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
from flask import Blueprint, render_template, request, redirect, url_for, flash, send_file, current_app, jsonify, abort
from flask_login import login_required, current_user
from datetime import datetime, timedelta
from utils.timezone import taiwan_now, format_taiwan_time
from models import TempSpec, db, Upload, SpecHistory
from utils import editor_or_admin_required, add_history_log, admin_required, send_email, process_recipients
from ldap_utils import get_ldap_group_members
import os
import shutil
import jwt
import requests
from werkzeug.utils import secure_filename
from docxtpl import DocxTemplate
temp_spec_bp = Blueprint('temp_spec', __name__)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# 移除 @login_required
@temp_spec_bp.before_request
def before_request():
"""在處理此藍圖中的任何請求之前,確保使用者已登入。"""
# 移除全域的登入驗證,改為在各個路由單獨設定
pass
def _generate_next_spec_code():
"""
產生下一個暫時規範編號。
規則: PE + 民國年(3碼) + 月份(2碼) + 流水號(2碼)
"""
now = taiwan_now()
roc_year = now.year - 1911
prefix = f"PE{roc_year}{now.strftime('%m')}"
latest_spec = TempSpec.query.filter(
TempSpec.spec_code.startswith(prefix)
).order_by(TempSpec.spec_code.desc()).first()
if latest_spec:
last_seq = int(latest_spec.spec_code[-2:])
new_seq = last_seq + 1
else:
new_seq = 1
return f"{prefix}{new_seq:02d}"
def get_file_uri(filename):
"""產生 Flask 應用程式可以存取到該檔案的 URL"""
return url_for('static', filename=f"generated/{filename}", _external=True)
@temp_spec_bp.route('/create', methods=['GET', 'POST'])
@editor_or_admin_required
def create_temp_spec():
if request.method == 'POST':
spec_code = _generate_next_spec_code()
form_data = request.form
now = taiwan_now()
# 1. 在資料庫中建立紀錄
spec = TempSpec(
spec_code=spec_code,
title=form_data['theme'],
applicant=form_data['applicant'],
status='pending_approval',
created_at=now,
start_date=now.date(),
end_date=(now + timedelta(days=30)).date()
)
db.session.add(spec)
db.session.flush()
# 2. 準備要填入 Word 範本的 context
context = {
'serial_number': spec_code,
'theme': form_data.get('theme', ''),
'package': form_data.get('package', ''),
'lot_number': form_data.get('lot_number', ''),
'equipment_type': form_data.get('equipment_type', ''),
'applicant': form_data.get('applicant', ''),
'applicant_phone': form_data.get('applicant_phone', ''),
'start_date': now.strftime('%Y-%m-%d'),
'end_date': (now + timedelta(days=30)).strftime('%Y-%m-%d'),
}
# 3. 處理勾選框邏輯
selected_stations = request.form.getlist('station')
station_keys = ['probing', 'dicing', 'diebond', 'wirebond', 'solder', 'molding',
'degate', 'deflash', 'plating', 'trimform', 'marking', 'tmtt', 'other']
for key in station_keys:
context[f's_{key}'] = '' if key in selected_stations else ''
selected_tccs_level = form_data.get('tccs_level')
level_keys = ['l1', 'l2', 'l3', 'l4']
for key in level_keys:
context[f't_{key}'] = '' if key == selected_tccs_level else ''
selected_tccs_4m = form_data.get('tccs_4m')
m_keys = ['man', 'machine', 'material', 'method', 'env']
for key in m_keys:
context[f't_{key}'] = '' if key == selected_tccs_4m else ''
# 4. 渲染 Word 範本
generated_folder = os.path.join(current_app.static_folder, 'generated')
os.makedirs(generated_folder, exist_ok=True)
template_path = os.path.join(BASE_DIR, 'template_with_placeholders.docx')
new_file_path = os.path.join(generated_folder, f"{spec_code}.docx")
if not os.path.exists(template_path):
flash('找不到 Word 範本檔案 (template_with_placeholders.docx)!', 'danger')
db.session.rollback()
return redirect(url_for('temp_spec.spec_list'))
doc = DocxTemplate(template_path)
doc.render(context)
doc.save(new_file_path)
# 5. 提交資料庫並重新導向
add_history_log(spec.id, '建立', f"建立暫時規範草稿: {spec.spec_code}")
db.session.commit()
return redirect(url_for('temp_spec.edit_spec', spec_id=spec.id))
# GET 請求:顯示建立表單
return render_template('create_temp_spec_form.html')
@temp_spec_bp.route('/edit/<int:spec_id>')
@editor_or_admin_required
def edit_spec(spec_id):
spec = TempSpec.query.get_or_404(spec_id)
doc_filename = f"{spec.spec_code}.docx"
doc_physical_path = os.path.join(current_app.static_folder, 'generated', doc_filename)
if not os.path.exists(doc_physical_path):
flash(f'找不到文件檔案: {doc_filename}', 'danger')
return redirect(url_for('temp_spec.spec_list'))
# --- START: 修正文件下載與回呼的 URL ---
# 1. 產生標準的文件 URL 和回呼 URL
doc_url = get_file_uri(doc_filename)
callback_url = url_for('temp_spec.onlyoffice_callback', spec_id=spec_id, _external=True)
# 2. 修正容器間通訊的 URL使用正確的容器名稱
if '127.0.0.1' in doc_url or 'localhost' in doc_url:
# 在 Docker Compose 環境中OnlyOffice 應該透過 nginx 存取 Flask 應用
doc_url = doc_url.replace('127.0.0.1:12013', 'panjit-tempspec-nginx:80').replace('localhost:12013', 'panjit-tempspec-nginx:80')
doc_url = doc_url.replace('127.0.0.1', 'panjit-tempspec-nginx').replace('localhost', 'panjit-tempspec-nginx')
callback_url = callback_url.replace('127.0.0.1:12013', 'panjit-tempspec-nginx:80').replace('localhost:12013', 'panjit-tempspec-nginx:80')
callback_url = callback_url.replace('127.0.0.1', 'panjit-tempspec-nginx').replace('localhost', 'panjit-tempspec-nginx')
# --- END: 修正文件下載與回呼的 URL ---
oo_secret = current_app.config['ONLYOFFICE_JWT_SECRET']
# 生成唯一的文件密鑰,包含更新時間戳
file_key = f"{spec.id}_{int(os.path.getmtime(doc_physical_path))}"
payload = {
"document": {
"fileType": "docx",
"key": file_key,
"title": doc_filename,
"url": doc_url # <-- 使用修正後的 doc_url
},
"documentType": "word",
"editorConfig": {
"callbackUrl": callback_url, # <-- 使用修正後的回呼 URL
"user": { "id": str(current_user.id), "name": current_user.username },
"customization": {
"autosave": True,
"forcesave": True,
"chat": False,
"comments": True,
"help": False
},
"mode": "edit"
}
}
token = jwt.encode(payload, oo_secret, algorithm='HS256')
config = payload.copy()
config['token'] = token
return render_template(
'onlyoffice_editor.html',
spec=spec,
config=config,
onlyoffice_url=current_app.config['ONLYOFFICE_URL']
)
# 這個路由不需要登入驗證,因為是 ONLYOFFICE Server 在呼叫它
@temp_spec_bp.route('/onlyoffice-callback/<int:spec_id>', methods=['POST'])
def onlyoffice_callback(spec_id):
data = request.json
status = data.get('status')
# 記錄所有回調狀態以便調試
current_app.logger.info(f"OnlyOffice callback for spec {spec_id}: status={status}, data={data}")
# OnlyOffice 狀態說明:
# 0 - 文件未找到
# 1 - 文件編輯中
# 2 - 文件準備保存
# 3 - 文件保存中
# 4 - 文件已關閉,無變更
# 6 - 文件編輯中,但已強制保存
# 7 - 發生錯誤
if status in [2, 6]: # 文件需要保存或已強制保存
try:
if 'url' not in data:
current_app.logger.error(f"OnlyOffice callback missing URL for spec {spec_id}")
return jsonify({"error": 1, "message": "Missing document URL"})
# 驗證 JWT Token (如果有)
token = data.get('token')
if token:
try:
oo_secret = current_app.config['ONLYOFFICE_JWT_SECRET']
jwt.decode(token, oo_secret, algorithms=['HS256'])
except jwt.InvalidTokenError:
current_app.logger.error(f"Invalid JWT token in OnlyOffice callback for spec {spec_id}")
return jsonify({"error": 1, "message": "Invalid token"})
# 修正 OnlyOffice 回調中的 URL 以供容器間通信使用
download_url = data['url']
# 將外部訪問 URL 轉換為容器間通信 URL
download_url = download_url.replace('localhost:12015', 'panjit-tempspec-onlyoffice:80')
download_url = download_url.replace('127.0.0.1:12015', 'panjit-tempspec-onlyoffice:80')
current_app.logger.info(f"Downloading updated document from: {data['url']} -> {download_url}")
response = requests.get(download_url, timeout=30)
response.raise_for_status()
# 保存文件
spec = TempSpec.query.get_or_404(spec_id)
doc_filename = f"{spec.spec_code}.docx"
file_path = os.path.join(current_app.static_folder, 'generated', doc_filename)
# 確保目錄存在
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, 'wb') as f:
f.write(response.content)
current_app.logger.info(f"Successfully saved updated document for spec {spec_id} to {file_path}")
# 更新資料庫中的修改時間
spec.updated_at = taiwan_now()
db.session.commit()
except requests.RequestException as e:
current_app.logger.error(f"Failed to download document from OnlyOffice for spec {spec_id}: {e}")
return jsonify({"error": 1, "message": f"Download failed: {str(e)}"})
except Exception as e:
current_app.logger.error(f"OnlyOffice callback error for spec {spec_id}: {e}")
return jsonify({"error": 1, "message": str(e)})
elif status == 1:
current_app.logger.info(f"Document {spec_id} is being edited")
elif status == 4:
current_app.logger.info(f"Document {spec_id} closed without changes")
elif status == 7:
current_app.logger.error(f"OnlyOffice error for document {spec_id}")
return jsonify({"error": 1, "message": "OnlyOffice reported an error"})
return jsonify({"error": 0})
# --- 其他既有路由 ---
@temp_spec_bp.route('/list')
@login_required # 補上登入驗證
def spec_list():
page = request.args.get('page', 1, type=int)
query = request.args.get('query', '')
status_filter = request.args.get('status', '')
specs_query = TempSpec.query
if query:
search_term = f"%{query}%"
specs_query = specs_query.filter(
db.or_(
TempSpec.spec_code.ilike(search_term),
TempSpec.title.ilike(search_term)
)
)
if status_filter:
specs_query = specs_query.filter(TempSpec.status == status_filter)
pagination = specs_query.order_by(TempSpec.created_at.desc()).paginate(
page=page, per_page=15, error_out=False
)
specs = pagination.items
# --- START: 新增的程式碼 ---
# 取得今天的日期,並傳給模板
from datetime import date
today = date.today()
# --- END: 新增的程式碼 ---
return render_template(
'spec_list.html',
specs=specs,
pagination=pagination,
query=query,
status=status_filter,
today=today # <-- 將 today 傳遞到模板
)
@temp_spec_bp.route('/activate/<int:spec_id>', methods=['GET', 'POST'])
@admin_required
def activate_spec(spec_id):
spec = TempSpec.query.get_or_404(spec_id)
if request.method == 'POST':
uploaded_file = request.files.get('signed_file')
if not uploaded_file or uploaded_file.filename == '':
flash('您必須上傳一個檔案。', 'danger')
return redirect(url_for('temp_spec.activate_spec', spec_id=spec.id))
filename = secure_filename(f"{spec.spec_code}_signed_{taiwan_now().strftime('%Y%m%d%H%M%S')}.pdf")
upload_folder = os.path.join(BASE_DIR, current_app.config['UPLOAD_FOLDER'])
os.makedirs(upload_folder, exist_ok=True)
file_path = os.path.join(upload_folder, filename)
uploaded_file.save(file_path)
new_upload = Upload(
temp_spec_id=spec.id,
filename=filename,
upload_time=taiwan_now()
)
db.session.add(new_upload)
spec.status = 'active'
# 儲存通知郵件清單到資料庫
recipients_str = request.form.get('recipients')
if recipients_str:
spec.notification_emails = recipients_str.strip()
add_history_log(spec.id, '啟用', f"上傳已簽核檔案 '{filename}'")
db.session.commit()
flash(f"規範 '{spec.spec_code}' 已生效!", 'success')
# --- Start of Dynamic Email Notification ---
if recipients_str:
recipients = process_recipients(recipients_str)
if recipients:
subject = f"[暫規通知] 規範 '{spec.spec_code}' 已正式生效"
# Using f-strings and triple quotes for a readable HTML body
body = f"""
<html>
<body>
<p>您好,</p>
<p>暫時規範 <b>{spec.spec_code} - {spec.title}</b> 已由管理員啟用,並正式生效。</p>
<p>詳細資訊請登入系統查看。</p>
<p>生效日期: {spec.start_date.strftime('%Y-%m-%d')}<br>
結束日期: {spec.end_date.strftime('%Y-%m-%d')}</p>
<p>申請人: {spec.applicant}</p>
<p>此為系統自動發送的通知郵件,請勿直接回覆。</p>
</body>
</html>
"""
send_email(recipients, subject, body)
current_app.logger.info(f"Sent activation notification for spec {spec.spec_code} to {len(recipients)} recipients.")
# --- End of Dynamic Email Notification ---
return redirect(url_for('temp_spec.spec_list'))
return render_template('activate_spec.html', spec=spec)
@temp_spec_bp.route('/terminate/<int:spec_id>', methods=['GET', 'POST'])
@editor_or_admin_required
def terminate_spec(spec_id):
spec = TempSpec.query.get_or_404(spec_id)
if request.method == 'POST':
reason = request.form.get('reason')
if not reason:
flash('請填寫提早結束的原因。', 'danger')
return redirect(url_for('temp_spec.terminate_spec', spec_id=spec.id))
spec.status = 'terminated'
spec.termination_reason = reason
spec.end_date = taiwan_now().date()
add_history_log(spec.id, '終止', f"原因: {reason}")
# --- Start of Dynamic Email Notification ---
# 優先使用表單提交的收件者,如果沒有則使用資料庫中儲存的
recipients_str = request.form.get('recipients')
if not recipients_str and spec.notification_emails:
recipients_str = spec.notification_emails
if recipients_str:
recipients = process_recipients(recipients_str)
if recipients:
subject = f"[暫規通知] 規範 '{spec.spec_code}' 已提早終止"
body = f"""
<html>
<body>
<p>您好,</p>
<p>暫時規範 <b>{spec.spec_code} - {spec.title}</b> 已被提早終止。</p>
<p>終止日期: <b>{spec.end_date.strftime('%Y-%m-%d')}</b></p>
<p>申請人: {spec.applicant}</p>
<p>終止原因: {reason}</p>
<p>詳細資訊請登入系統查看。</p>
<p>此為系統自動發送的通知郵件,請勿直接回覆。</p>
</body>
</html>
"""
send_email(recipients, subject, body)
current_app.logger.info(f"Sent termination notification for spec {spec.spec_code} to {len(recipients)} recipients.")
# --- End of Dynamic Email Notification ---
db.session.commit()
flash(f"規範 '{spec.spec_code}' 已被提早終止。", 'warning')
return redirect(url_for('temp_spec.spec_list'))
# 將儲存的郵件清單傳遞給模板
return render_template('terminate_spec.html', spec=spec, saved_emails=spec.notification_emails)
@temp_spec_bp.route('/download_initial_word/<int:spec_id>')
@login_required
def download_initial_word(spec_id):
spec = TempSpec.query.get_or_404(spec_id)
if current_user.role not in ['editor', 'admin']:
flash('權限不足,無法下載 Word 檔案。', 'danger')
abort(403)
generated_folder = os.path.join(current_app.static_folder, 'generated')
word_path = os.path.join(generated_folder, f"{spec.spec_code}.docx")
if not os.path.exists(word_path):
flash('找不到最初產生的 Word 檔案,可能已被刪除或移動。', 'danger')
return redirect(url_for('temp_spec.spec_list'))
return send_file(word_path, as_attachment=True)
@temp_spec_bp.route('/download_signed/<int:spec_id>')
@login_required # 補上登入驗證
def download_signed_pdf(spec_id):
latest_upload = Upload.query.filter_by(temp_spec_id=spec_id).order_by(Upload.upload_time.desc()).first()
if not latest_upload:
flash('找不到任何已上傳的簽核檔案。', 'danger')
return redirect(url_for('temp_spec.spec_list'))
upload_folder = os.path.join(BASE_DIR, current_app.config['UPLOAD_FOLDER'])
return send_file(os.path.join(upload_folder, latest_upload.filename), as_attachment=True)
@temp_spec_bp.route('/extend/<int:spec_id>', methods=['GET', 'POST'])
@editor_or_admin_required
def extend_spec(spec_id):
spec = TempSpec.query.get_or_404(spec_id)
# 檢查展延次數限制最多展延2次
if spec.extension_count >= 2:
flash('此暫時規範已達展延次數上限2次無法再次展延。總效期已達90天上限。', 'danger')
return redirect(url_for('temp_spec.spec_list'))
if request.method == 'POST':
new_end_date_str = request.form.get('new_end_date')
uploaded_file = request.files.get('new_file')
if not uploaded_file or uploaded_file.filename == '':
flash('您必須上傳新的佐證檔案才能展延。', 'danger')
return redirect(url_for('temp_spec.extend_spec', spec_id=spec.id))
if not new_end_date_str:
flash('請選擇新的結束日期', 'danger')
return redirect(url_for('temp_spec.extend_spec', spec_id=spec.id))
spec.end_date = datetime.strptime(new_end_date_str, '%Y-%m-%d').date()
spec.extension_count += 1
spec.status = 'active'
filename = secure_filename(f"{spec.spec_code}_extension_{spec.extension_count}_{taiwan_now().strftime('%Y%m%d')}.pdf")
upload_folder = os.path.join(BASE_DIR, current_app.config['UPLOAD_FOLDER'])
os.makedirs(upload_folder, exist_ok=True)
file_path = os.path.join(upload_folder, filename)
uploaded_file.save(file_path)
new_upload = Upload(
temp_spec_id=spec.id,
filename=filename,
upload_time=taiwan_now()
)
db.session.add(new_upload)
details = f"展延結束日期至 {spec.end_date.strftime('%Y-%m-%d')}"
details += f",並上傳新檔案 '{new_upload.filename}'"
add_history_log(spec.id, '展延', details)
# --- Start of Dynamic Email Notification ---
# 優先使用表單提交的收件者,如果沒有則使用資料庫中儲存的
recipients_str = request.form.get('recipients')
if not recipients_str and spec.notification_emails:
recipients_str = spec.notification_emails
# 如果使用者有更新郵件清單,儲存回資料庫
if recipients_str:
spec.notification_emails = recipients_str.strip()
if recipients_str:
recipients = process_recipients(recipients_str)
if recipients:
subject = f"[暫規通知] 規範 '{spec.spec_code}' 已展延"
body = f"""
<html>
<body>
<p>您好,</p>
<p>暫時規範 <b>{spec.spec_code} - {spec.title}</b> 已成功展延。</p>
<p>新的結束日期為: <b>{spec.end_date.strftime('%Y-%m-%d')}</b></p>
<p>申請人: {spec.applicant}</p>
<p>詳細資訊請登入系統查看。</p>
<p>此為系統自動發送的通知郵件,請勿直接回覆。</p>
</body>
</html>
"""
send_email(recipients, subject, body)
current_app.logger.info(f"Sent extension notification for spec {spec.spec_code} to {len(recipients)} recipients.")
# --- End of Dynamic Email Notification ---
db.session.commit()
flash(f"規範 '{spec.spec_code}' 已成功展延!", 'success')
return redirect(url_for('temp_spec.spec_list'))
default_new_end_date = spec.end_date + timedelta(days=30)
# 將儲存的郵件清單傳遞給模板
return render_template('extend_spec.html', spec=spec, default_new_end_date=default_new_end_date, saved_emails=spec.notification_emails)
@temp_spec_bp.route('/history/<int:spec_id>')
@login_required # 補上登入驗證
def spec_history(spec_id):
spec = TempSpec.query.get_or_404(spec_id)
history = SpecHistory.query.filter_by(spec_id=spec_id).order_by(SpecHistory.timestamp.desc()).all()
return render_template('spec_history.html', spec=spec, history=history)
@temp_spec_bp.route('/delete/<int:spec_id>', methods=['POST'])
@admin_required
def delete_spec(spec_id):
spec = TempSpec.query.get_or_404(spec_id)
spec_code = spec.spec_code
files_to_delete = []
generated_folder = os.path.join(current_app.static_folder, 'generated')
files_to_delete.append(os.path.join(generated_folder, f"{spec.spec_code}.docx"))
upload_folder = os.path.join(BASE_DIR, current_app.config['UPLOAD_FOLDER'])
for upload_record in spec.uploads:
files_to_delete.append(os.path.join(upload_folder, upload_record.filename))
for f_path in files_to_delete:
try:
if os.path.exists(f_path):
os.remove(f_path)
except Exception as e:
current_app.logger.error(f"刪除檔案失敗: {f_path}, 原因: {e}")
db.session.delete(spec)
db.session.commit()
flash(f"規範 '{spec_code}' 及其所有相關檔案已成功刪除。", 'success')
return redirect(url_for('temp_spec.spec_list'))