"""Utility helpers used across the application.""" from __future__ import annotations import logging import smtplib from email.header import Header from email.mime.text import MIMEText from functools import wraps from typing import Callable, Iterable, List from flask import abort, current_app from flask_login import current_user logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Access control decorators # --------------------------------------------------------------------------- def _require_roles(roles: Iterable[str]) -> Callable: """Abort with 403 when the current user does not own one of the roles.""" def decorator(view_func: Callable) -> Callable: @wraps(view_func) def wrapper(*args, **kwargs): if not current_user.is_authenticated or current_user.role not in roles: abort(403) return view_func(*args, **kwargs) return wrapper return decorator def admin_required(view_func: Callable) -> Callable: """Limit a view to administrator accounts only.""" return _require_roles({"admin"})(view_func) def editor_or_admin_required(view_func: Callable) -> Callable: """Limit a view to editor or administrator accounts.""" return _require_roles({"editor", "admin"})(view_func) # --------------------------------------------------------------------------- # History helpers # --------------------------------------------------------------------------- def add_history_log(spec_id: int, action: str, details: str = "") -> None: """Persist a history record for an action executed on a spec.""" from models import SpecHistory, db entry = SpecHistory( spec_id=spec_id, user_id=current_user.id if current_user.is_authenticated else None, action=action, details=details, ) db.session.add(entry) db.session.commit() # --------------------------------------------------------------------------- # Notification helpers # --------------------------------------------------------------------------- def process_recipients(recipients_str: str | None) -> List[str]: """Convert a semicolon separated string into a list of unique addresses.""" if not recipients_str: return [] normalized = recipients_str.replace("\r", ";").replace("\n", ";") candidates = [item.strip() for item in normalized.split(";") if item.strip()] seen: set[str] = set() result: List[str] = [] for email in candidates: if email not in seen: seen.add(email) result.append(email) return result def send_email(to_addrs: Iterable[str], subject: str, body: str) -> bool: """Send an HTML email using the SMTP configuration in Flask config.""" recipients = list(to_addrs) if not recipients: logger.info("send_email skipped: no recipients provided") return False cfg = current_app.config smtp_server = cfg.get("SMTP_SERVER") smtp_port = int(cfg.get("SMTP_PORT", 25)) use_tls = bool(cfg.get("SMTP_USE_TLS", False)) use_ssl = bool(cfg.get("SMTP_USE_SSL", False)) sender = cfg.get("SMTP_SENDER_EMAIL") password = cfg.get("SMTP_SENDER_PASSWORD", "") auth_required = bool(cfg.get("SMTP_AUTH_REQUIRED", False)) logger.debug( "Preparing email: subject=%s recipients=%s server=%s:%s use_tls=%s use_ssl=%s auth_required=%s", subject, recipients, smtp_server, smtp_port, use_tls, use_ssl, auth_required, ) message = MIMEText(body, "html", "utf-8") message["Subject"] = Header(subject, "utf-8") message["From"] = sender message["To"] = ", ".join(recipients) try: if use_ssl and smtp_port == 465: server = smtplib.SMTP_SSL(smtp_server, smtp_port) else: server = smtplib.SMTP(smtp_server, smtp_port) if use_tls and smtp_port == 587: server.starttls() if auth_required and password: server.login(sender, password) server.sendmail(sender, recipients, message.as_string()) server.quit() logger.info("Email sent to %s (subject=%s)", recipients, subject) return True except smtplib.SMTPException as exc: logger.error("SMTP error while sending email: %s", exc) except Exception as exc: # pragma: no cover logger.exception("Unexpected error while sending email: %s", exc) return False