Files
TEMP_spec_system_noad/utils/__init__.py
2025-09-25 08:44:44 +08:00

145 lines
4.4 KiB
Python

"""Utility helpers used across the application."""
from __future__ import annotations
import logging
import smtplib
from email.header import Header
from email.mime.text import MIMEText
from functools import wraps
from typing import Callable, Iterable, List
from flask import abort, current_app
from flask_login import current_user
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Access control decorators
# ---------------------------------------------------------------------------
def _require_roles(roles: Iterable[str]) -> Callable:
"""Abort with 403 when the current user does not own one of the roles."""
def decorator(view_func: Callable) -> Callable:
@wraps(view_func)
def wrapper(*args, **kwargs):
if not current_user.is_authenticated or current_user.role not in roles:
abort(403)
return view_func(*args, **kwargs)
return wrapper
return decorator
def admin_required(view_func: Callable) -> Callable:
"""Limit a view to administrator accounts only."""
return _require_roles({"admin"})(view_func)
def editor_or_admin_required(view_func: Callable) -> Callable:
"""Limit a view to editor or administrator accounts."""
return _require_roles({"editor", "admin"})(view_func)
# ---------------------------------------------------------------------------
# History helpers
# ---------------------------------------------------------------------------
def add_history_log(spec_id: int, action: str, details: str = "") -> None:
"""Persist a history record for an action executed on a spec."""
from models import SpecHistory, db
entry = SpecHistory(
spec_id=spec_id,
user_id=current_user.id if current_user.is_authenticated else None,
action=action,
details=details,
)
db.session.add(entry)
db.session.commit()
# ---------------------------------------------------------------------------
# Notification helpers
# ---------------------------------------------------------------------------
def process_recipients(recipients_str: str | None) -> List[str]:
"""Convert a semicolon separated string into a list of unique addresses."""
if not recipients_str:
return []
normalized = recipients_str.replace("\r", ";").replace("\n", ";")
candidates = [item.strip() for item in normalized.split(";") if item.strip()]
seen: set[str] = set()
result: List[str] = []
for email in candidates:
if email not in seen:
seen.add(email)
result.append(email)
return result
def send_email(to_addrs: Iterable[str], subject: str, body: str) -> bool:
"""Send an HTML email using the SMTP configuration in Flask config."""
recipients = list(to_addrs)
if not recipients:
logger.info("send_email skipped: no recipients provided")
return False
cfg = current_app.config
smtp_server = cfg.get("SMTP_SERVER")
smtp_port = int(cfg.get("SMTP_PORT", 25))
use_tls = bool(cfg.get("SMTP_USE_TLS", False))
use_ssl = bool(cfg.get("SMTP_USE_SSL", False))
sender = cfg.get("SMTP_SENDER_EMAIL")
password = cfg.get("SMTP_SENDER_PASSWORD", "")
auth_required = bool(cfg.get("SMTP_AUTH_REQUIRED", False))
logger.debug(
"Preparing email: subject=%s recipients=%s server=%s:%s use_tls=%s use_ssl=%s auth_required=%s",
subject,
recipients,
smtp_server,
smtp_port,
use_tls,
use_ssl,
auth_required,
)
message = MIMEText(body, "html", "utf-8")
message["Subject"] = Header(subject, "utf-8")
message["From"] = sender
message["To"] = ", ".join(recipients)
try:
if use_ssl and smtp_port == 465:
server = smtplib.SMTP_SSL(smtp_server, smtp_port)
else:
server = smtplib.SMTP(smtp_server, smtp_port)
if use_tls and smtp_port == 587:
server.starttls()
if auth_required and password:
server.login(sender, password)
server.sendmail(sender, recipients, message.as_string())
server.quit()
logger.info("Email sent to %s (subject=%s)", recipients, subject)
return True
except smtplib.SMTPException as exc:
logger.error("SMTP error while sending email: %s", exc)
except Exception as exc: # pragma: no cover
logger.exception("Unexpected error while sending email: %s", exc)
return False