Files
TEMP_spec_system_V3/utils.py
beabigegg dcd15baf3f 4TH
2025-08-28 13:20:29 +08:00

302 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from docxtpl import DocxTemplate, InlineImage
from docx.shared import Mm
from docx2pdf import convert
import os
import re
from functools import wraps
from flask_login import current_user
from flask import abort
from bs4 import BeautifulSoup, NavigableString, Tag
import pythoncom
import mistune
from PIL import Image
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
def _resolve_image_path(src: str) -> str:
"""
將 HTML 圖片 src 轉換為本地檔案絕對路徑
支援 /static/... 路徑與相對路徑
"""
if src.startswith('/'):
static_index = src.find('/static/')
if static_index != -1:
img_path_rel = src[static_index+1:] # 移除開頭斜線
return os.path.join(BASE_DIR, img_path_rel)
return os.path.join(BASE_DIR, src.lstrip('/'))
import logging
DEBUG_LOG = False # 生產環境關閉 debug 訊息
def _process_markdown_sections(doc, md_content):
from bs4 import BeautifulSoup, Tag
from PIL import Image
from docxtpl import InlineImage
from docx.shared import Mm
def log(msg):
if DEBUG_LOG:
print(f"[DEBUG] {msg}")
def resolve_image(src):
if src.startswith('/'):
static_index = src.find('/static/')
if static_index != -1:
path_rel = src[static_index + 1:]
return os.path.join(BASE_DIR, path_rel)
return os.path.join(BASE_DIR, src.lstrip('/'))
def extract_table_text(table_tag):
lines = []
for i, row in enumerate(table_tag.find_all("tr")):
cells = row.find_all(["td", "th"])
row_text = " | ".join(cell.get_text(strip=True) for cell in cells)
lines.append(row_text)
if i == 0:
lines.append(" | ".join(["---"] * len(cells)))
return "\n".join(lines)
results = []
if not md_content:
log("Markdown content is empty")
return results
html = mistune.html(md_content)
soup = BeautifulSoup(html, 'lxml')
for elem in soup.body.children:
if isinstance(elem, Tag):
if elem.name == 'table':
table_text = extract_table_text(elem)
log(f"[表格] {table_text}")
results.append({'text': table_text, 'image': None})
continue
if elem.name in ['p', 'div']:
for child in elem.children:
if isinstance(child, Tag) and child.name == 'img' and child.has_attr('src'):
try:
img_path = resolve_image(child['src'])
if os.path.exists(img_path):
with Image.open(img_path) as im:
width_px = im.width
width_mm = min(width_px * 25.4 / 96, 130)
image = InlineImage(doc, img_path, width=Mm(width_mm))
log(f"[圖片] {img_path}, 寬: {width_mm:.2f} mm")
results.append({'text': None, 'image': image})
else:
log(f"[警告] 圖片不存在: {img_path}")
except Exception as e:
log(f"[錯誤] 圖片處理失敗: {e}")
else:
text = child.get_text(strip=True) if hasattr(child, 'get_text') else str(child).strip()
if text:
log(f"[文字] {text}")
results.append({'text': text, 'image': None})
return results
def fill_template(values, template_path, output_word_path, output_pdf_path):
from docxtpl import DocxTemplate
import pythoncom
from docx2pdf import convert
doc = DocxTemplate(template_path)
# 填入 contextNone 改為空字串
context = {k: (v if v is not None else '') for k, v in values.items()}
# 更新後版本:處理 Markdown → sections支援圖片+表格+段落)
context["change_before_sections"] = _process_markdown_sections(doc, context.get("change_before", ""))
context["change_after_sections"] = _process_markdown_sections(doc, context.get("change_after", ""))
# 渲染
doc.render(context)
doc.save(output_word_path)
# 轉 PDF
try:
pythoncom.CoInitialize()
convert(output_word_path, output_pdf_path)
except Exception as e:
print(f"PDF conversion failed: {e}")
raise
finally:
pythoncom.CoUninitialize()
def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated or current_user.role != 'admin':
abort(403) # Forbidden
return f(*args, **kwargs)
return decorated_function
def editor_or_admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated or current_user.role not in ['editor', 'admin']:
abort(403) # Forbidden
return f(*args, **kwargs)
return decorated_function
def add_history_log(spec_id, action, details=""):
"""新增一筆操作歷史紀錄"""
from models import db, SpecHistory
history_entry = SpecHistory(
spec_id=spec_id,
user_id=current_user.id,
action=action,
details=details
)
db.session.add(history_entry)
import smtplib
from email.mime.text import MIMEText
from email.header import Header
from flask import current_app
def process_recipients(recipients_str):
"""
處理收件者字串,支援個人郵件和群組展開
輸入格式: "email1,email2,group:GroupName"
返回: 展開後的郵件地址列表
"""
print(f"[RECIPIENTS DEBUG] 開始處理收件者: {recipients_str}")
if not recipients_str:
print(f"[RECIPIENTS DEBUG] 收件者字串為空")
return []
recipients = [item.strip() for item in recipients_str.split(',') if item.strip()]
final_emails = []
for recipient in recipients:
print(f"[RECIPIENTS DEBUG] 處理收件者項目: {recipient}")
if recipient.startswith('group:'):
# 這是一個群組,需要展開
group_name = recipient[6:] # 移除 'group:' 前綴
print(f"[RECIPIENTS DEBUG] 發現群組: {group_name}")
try:
from ldap_utils import get_ldap_group_members
group_emails = get_ldap_group_members(group_name)
print(f"[RECIPIENTS DEBUG] 群組 {group_name} 包含 {len(group_emails)} 個成員")
for email in group_emails:
if email and email not in final_emails:
final_emails.append(email)
print(f"[RECIPIENTS DEBUG] 添加群組成員郵件: {email}")
except Exception as e:
print(f"[RECIPIENTS ERROR] 群組 {group_name} 展開失敗: {e}")
else:
# 這是個人郵件地址
if recipient and recipient not in final_emails:
final_emails.append(recipient)
print(f"[RECIPIENTS DEBUG] 添加個人郵件: {recipient}")
print(f"[RECIPIENTS DEBUG] 最終收件者列表 ({len(final_emails)} 個): {final_emails}")
return final_emails
def send_email(to_addrs, subject, body):
"""
Sends an email using the SMTP settings from the config.
Supports both authenticated (Port 465/587) and unauthenticated (Port 25) methods.
"""
print(f"[EMAIL DEBUG] 開始發送郵件...")
print(f"[EMAIL DEBUG] 收件者數量: {len(to_addrs)}")
print(f"[EMAIL DEBUG] 收件者: {to_addrs}")
print(f"[EMAIL DEBUG] 主旨: {subject}")
try:
# 取得 SMTP 設定
smtp_server = current_app.config['SMTP_SERVER']
smtp_port = current_app.config['SMTP_PORT']
use_tls = current_app.config.get('SMTP_USE_TLS', False)
use_ssl = current_app.config.get('SMTP_USE_SSL', False)
sender_email = current_app.config['SMTP_SENDER_EMAIL']
sender_password = current_app.config.get('SMTP_SENDER_PASSWORD', '')
auth_required = current_app.config.get('SMTP_AUTH_REQUIRED', False)
print(f"[EMAIL DEBUG] SMTP 設定:")
print(f"[EMAIL DEBUG] - 伺服器: {smtp_server}:{smtp_port}")
print(f"[EMAIL DEBUG] - 使用 TLS: {use_tls}")
print(f"[EMAIL DEBUG] - 使用 SSL: {use_ssl}")
print(f"[EMAIL DEBUG] - 寄件者: {sender_email}")
print(f"[EMAIL DEBUG] - 需要認證: {auth_required}")
print(f"[EMAIL DEBUG] - 有密碼: {'' if sender_password else ''}")
# 建立郵件內容
print(f"[EMAIL DEBUG] 建立郵件內容...")
msg = MIMEText(body, 'html', 'utf-8')
msg['Subject'] = Header(subject, 'utf-8')
msg['From'] = sender_email
msg['To'] = ', '.join(to_addrs)
print(f"[EMAIL DEBUG] 郵件內容建立完成")
# 連接 SMTP 伺服器
if use_ssl and smtp_port == 465:
# Port 465 使用 SSL
print(f"[EMAIL DEBUG] 使用 SSL 連接 SMTP 伺服器 {smtp_server}:{smtp_port}...")
server = smtplib.SMTP_SSL(smtp_server, smtp_port)
else:
# Port 25 或 587 使用一般連接
print(f"[EMAIL DEBUG] 連接 SMTP 伺服器 {smtp_server}:{smtp_port}...")
server = smtplib.SMTP(smtp_server, smtp_port)
print(f"[EMAIL DEBUG] SMTP 伺服器連接成功")
if use_tls and smtp_port == 587:
print(f"[EMAIL DEBUG] 啟用 TLS...")
server.starttls()
print(f"[EMAIL DEBUG] TLS 啟用成功")
# 只在需要認證時才登入
if auth_required and sender_password:
print(f"[EMAIL DEBUG] 登入 SMTP 伺服器...")
server.login(sender_email, sender_password)
print(f"[EMAIL DEBUG] SMTP 登入成功")
else:
print(f"[EMAIL DEBUG] 使用匿名發送Port 25 無需認證)")
# 發送郵件
print(f"[EMAIL DEBUG] 發送郵件...")
result = server.sendmail(sender_email, to_addrs, msg.as_string())
print(f"[EMAIL DEBUG] 郵件發送結果: {result}")
server.quit()
print(f"[EMAIL DEBUG] SMTP 連接已關閉")
print(f"[EMAIL SUCCESS] 郵件成功發送至: {', '.join(to_addrs)}")
return True
except smtplib.SMTPAuthenticationError as e:
print(f"[EMAIL ERROR] SMTP 認證失敗: {e}")
print(f"[EMAIL ERROR] 請檢查寄件者帳號和密碼設定")
return False
except smtplib.SMTPConnectError as e:
print(f"[EMAIL ERROR] SMTP 連接失敗: {e}")
print(f"[EMAIL ERROR] 請檢查 SMTP 伺服器設定")
return False
except smtplib.SMTPRecipientsRefused as e:
print(f"[EMAIL ERROR] 收件者被拒絕: {e}")
print(f"[EMAIL ERROR] 請檢查收件者郵件地址")
return False
except Exception as e:
print(f"[EMAIL ERROR] 郵件發送失敗: {type(e).__name__}: {e}")
import traceback
print(f"[EMAIL ERROR] 詳細錯誤:")
traceback.print_exc()
return False