# -*- coding: utf-8 -*- from flask import Blueprint, render_template, request, redirect, url_for, flash, send_file, current_app, jsonify, abort from flask_login import login_required, current_user from datetime import datetime, timedelta from utils.timezone import taiwan_now, format_taiwan_time from models import TempSpec, db, Upload, SpecHistory from utils import editor_or_admin_required, add_history_log, admin_required, send_email, process_recipients import os import shutil import jwt import requests from werkzeug.utils import secure_filename from docxtpl import DocxTemplate temp_spec_bp = Blueprint('temp_spec', __name__) BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # Enforce authentication for this blueprint (except OnlyOffice callback) @temp_spec_bp.before_request def before_request(): """Ensure every request under this blueprint comes from an authenticated user.""" # Allow OnlyOffice callback without authentication if request.endpoint == 'temp_spec.onlyoffice_callback': return None if not current_user.is_authenticated: return redirect(url_for('auth.login')) def _generate_next_spec_code(): """Generate the next temporary spec code based on ROC year/month and a sequence.""" now = taiwan_now() roc_year = now.year - 1911 prefix = f"PE{roc_year}{now.strftime('%m')}" latest_spec = TempSpec.query.filter( TempSpec.spec_code.startswith(prefix) ).order_by(TempSpec.spec_code.desc()).first() if latest_spec: last_seq = int(latest_spec.spec_code[-2:]) new_seq = last_seq + 1 else: new_seq = 1 return f"{prefix}{new_seq:02d}" def get_file_uri(filename: str) -> str: """Return a public URL for a generated file stored under /static/generated.""" return url_for('static', filename=f"generated/{filename}", _external=True) @temp_spec_bp.route('/create', methods=['GET', 'POST']) @editor_or_admin_required def create_temp_spec(): if request.method == 'POST': spec_code = _generate_next_spec_code() form_data = request.form now = taiwan_now() spec = TempSpec( spec_code=spec_code, title=form_data['theme'], applicant=form_data['applicant'], status='pending_approval', created_at=now, start_date=now.date(), end_date=(now + timedelta(days=30)).date() ) db.session.add(spec) db.session.flush() context = { 'serial_number': spec_code, 'theme': form_data.get('theme', ''), 'package': form_data.get('package', ''), 'lot_number': form_data.get('lot_number', ''), 'equipment_type': form_data.get('equipment_type', ''), 'applicant': form_data.get('applicant', ''), 'applicant_phone': form_data.get('applicant_phone', ''), 'start_date': now.strftime('%Y-%m-%d'), 'end_date': (now + timedelta(days=30)).strftime('%Y-%m-%d'), } selected_stations = request.form.getlist('station') station_keys = ['probing', 'dicing', 'diebond', 'wirebond', 'solder', 'molding', 'degate', 'deflash', 'plating', 'trimform', 'marking', 'tmtt', 'other'] for key in station_keys: context[f's_{key}'] = 'Y' if key in selected_stations else 'N' selected_tccs_level = form_data.get('tccs_level') level_keys = ['l1', 'l2', 'l3', 'l4'] for key in level_keys: context[f't_{key}'] = 'Y' if key == selected_tccs_level else 'N' selected_tccs_4m = form_data.get('tccs_4m') m_keys = ['man', 'machine', 'material', 'method', 'env'] for key in m_keys: context[f't_{key}'] = 'Y' if key == selected_tccs_4m else 'N' generated_folder = os.path.join(current_app.static_folder, 'generated') os.makedirs(generated_folder, exist_ok=True) template_path = os.path.join(BASE_DIR, 'template_with_placeholders.docx') new_file_path = os.path.join(generated_folder, f"{spec_code}.docx") if not os.path.exists(template_path): flash('Word template (template_with_placeholders.docx) was not found.', 'danger') db.session.rollback() return redirect(url_for('temp_spec.spec_list')) doc = DocxTemplate(template_path) doc.render(context) doc.save(new_file_path) add_history_log(spec.id, 'create', f"Created draft spec: {spec.spec_code}") db.session.commit() return redirect(url_for('temp_spec.edit_spec', spec_id=spec.id)) return render_template('create_temp_spec_form.html') @temp_spec_bp.route('/edit/') @editor_or_admin_required def edit_spec(spec_id): spec = TempSpec.query.get_or_404(spec_id) doc_filename = f"{spec.spec_code}.docx" doc_physical_path = os.path.join(current_app.static_folder, 'generated', doc_filename) if not os.path.exists(doc_physical_path): flash(f"Generated document not found: {doc_filename}", 'danger') return redirect(url_for('temp_spec.spec_list')) doc_url = get_file_uri(doc_filename) callback_url = url_for('temp_spec.onlyoffice_callback', spec_id=spec_id, _external=True) if '127.0.0.1' in doc_url or 'localhost' in doc_url: doc_url = doc_url.replace('127.0.0.1:12013', 'panjit-tempspec-nginx:80').replace('localhost:12013', 'panjit-tempspec-nginx:80') doc_url = doc_url.replace('127.0.0.1', 'panjit-tempspec-nginx').replace('localhost', 'panjit-tempspec-nginx') callback_url = callback_url.replace('127.0.0.1:12013', 'panjit-tempspec-nginx:80').replace('localhost:12013', 'panjit-tempspec-nginx:80') callback_url = callback_url.replace('127.0.0.1', 'panjit-tempspec-nginx').replace('localhost', 'panjit-tempspec-nginx') oo_secret = current_app.config['ONLYOFFICE_JWT_SECRET'] file_key = f"{spec.id}_{int(os.path.getmtime(doc_physical_path))}" payload = { "document": { "fileType": "docx", "key": file_key, "title": doc_filename, "url": doc_url }, "documentType": "word", "editorConfig": { "callbackUrl": callback_url, "user": { "id": str(current_user.id), "name": current_user.name or current_user.username }, "customization": { "autosave": True, "forcesave": True, "chat": False, "comments": True, "help": False }, "mode": "edit" } } token = jwt.encode(payload, oo_secret, algorithm='HS256') config = payload.copy() config['token'] = token return render_template( 'onlyoffice_editor.html', spec=spec, config=config, onlyoffice_url=current_app.config['ONLYOFFICE_URL'] ) @temp_spec_bp.route('/onlyoffice-callback/', methods=['POST']) def onlyoffice_callback(spec_id): data = request.json status = data.get('status') current_app.logger.info(f"OnlyOffice callback for spec {spec_id}: status={status}, data={data}") if status in [2, 6]: try: if 'url' not in data: current_app.logger.error(f"OnlyOffice callback missing URL for spec {spec_id}") return jsonify({"error": 1, "message": "Missing document URL"}) token = data.get('token') if token: try: oo_secret = current_app.config['ONLYOFFICE_JWT_SECRET'] jwt.decode(token, oo_secret, algorithms=['HS256']) except jwt.InvalidTokenError: current_app.logger.error(f"Invalid JWT token in OnlyOffice callback for spec {spec_id}") return jsonify({"error": 1, "message": "Invalid token"}) download_url = data['url'] download_url = download_url.replace('localhost:12015', 'panjit-tempspec-onlyoffice:80') download_url = download_url.replace('127.0.0.1:12015', 'panjit-tempspec-onlyoffice:80') current_app.logger.info(f"Downloading updated document from: {data['url']} -> {download_url}") response = requests.get(download_url, timeout=30) response.raise_for_status() spec = TempSpec.query.get_or_404(spec_id) doc_filename = f"{spec.spec_code}.docx" file_path = os.path.join(current_app.static_folder, 'generated', doc_filename) os.makedirs(os.path.dirname(file_path), exist_ok=True) with open(file_path, 'wb') as f: f.write(response.content) current_app.logger.info(f"Successfully saved updated document for spec {spec_id} to {file_path}") spec.updated_at = taiwan_now() db.session.commit() except requests.RequestException as e: current_app.logger.error(f"Failed to download document from OnlyOffice for spec {spec_id}: {e}") return jsonify({"error": 1, "message": f"Download failed: {str(e)}"}) except Exception as e: current_app.logger.error(f"OnlyOffice callback error for spec {spec_id}: {e}") return jsonify({"error": 1, "message": str(e)}) elif status == 1: current_app.logger.info(f"Document {spec_id} is being edited") elif status == 4: current_app.logger.info(f"Document {spec_id} closed without changes") elif status == 7: current_app.logger.error(f"OnlyOffice error for document {spec_id}") return jsonify({"error": 1, "message": "OnlyOffice reported an error"}) return jsonify({"error": 0}) @temp_spec_bp.route('/list') @login_required def spec_list(): page = request.args.get('page', 1, type=int) query = request.args.get('query', '') status_filter = request.args.get('status', '') specs_query = TempSpec.query if query: search_term = f"%{query}%" specs_query = specs_query.filter( db.or_( TempSpec.spec_code.ilike(search_term), TempSpec.title.ilike(search_term) ) ) if status_filter: specs_query = specs_query.filter(TempSpec.status == status_filter) pagination = specs_query.order_by(TempSpec.created_at.desc()).paginate( page=page, per_page=15, error_out=False ) specs = pagination.items from datetime import date today = date.today() return render_template( 'spec_list.html', specs=specs, pagination=pagination, query=query, status=status_filter, today=today ) @temp_spec_bp.route('/activate/', methods=['GET', 'POST']) @admin_required def activate_spec(spec_id): spec = TempSpec.query.get_or_404(spec_id) if request.method == 'POST': uploaded_file = request.files.get('signed_file') if not uploaded_file or uploaded_file.filename == '': flash('Please upload a file first.', 'danger') return redirect(url_for('temp_spec.activate_spec', spec_id=spec.id)) filename = secure_filename(f"{spec.spec_code}_signed_{taiwan_now().strftime('%Y%m%d%H%M%S')}.pdf") upload_folder = os.path.join(BASE_DIR, current_app.config['UPLOAD_FOLDER']) os.makedirs(upload_folder, exist_ok=True) file_path = os.path.join(upload_folder, filename) uploaded_file.save(file_path) new_upload = Upload( temp_spec_id=spec.id, filename=filename, upload_time=taiwan_now() ) db.session.add(new_upload) spec.status = 'active' recipients_str = request.form.get('recipients') if recipients_str: spec.notification_emails = recipients_str.strip() add_history_log(spec.id, 'activate', f"Uploaded signed file '{filename}'") db.session.commit() flash(f"Spec '{spec.spec_code}' is now active.", 'success') # --- Start of Dynamic Email Notification --- if recipients_str: recipients = process_recipients(recipients_str) if recipients: subject = f"[TempSpec Notice] Spec '{spec.spec_code}' is active" # Using f-strings and triple quotes for a readable HTML body body = f"""

Hello,

Temp spec {spec.spec_code} - {spec.title} has been approved and is now active.

Start date: {spec.start_date.strftime('%Y-%m-%d')}
End date: {spec.end_date.strftime('%Y-%m-%d')}

Applicant: {spec.applicant}

Please sign in to the system for additional details.

This notification was sent automatically. Please do not reply.

""" send_email(recipients, subject, body) current_app.logger.info(f"Sent activation notification for spec {spec.spec_code} to {len(recipients)} recipients.") # --- End of Dynamic Email Notification --- return redirect(url_for('temp_spec.spec_list')) return render_template('activate_spec.html', spec=spec) @temp_spec_bp.route('/terminate/', methods=['GET', 'POST']) @editor_or_admin_required def terminate_spec(spec_id): spec = TempSpec.query.get_or_404(spec_id) if request.method == 'POST': reason = request.form.get('reason') if not reason: flash('Please provide the reason for early termination.', 'danger') return redirect(url_for('temp_spec.terminate_spec', spec_id=spec.id)) spec.status = 'terminated' spec.termination_reason = reason spec.end_date = taiwan_now().date() add_history_log(spec.id, 'terminate', f"Reason: {reason}") # --- Start of Dynamic Email Notification --- recipients_str = request.form.get('recipients') if not recipients_str and spec.notification_emails: recipients_str = spec.notification_emails if recipients_str: recipients = process_recipients(recipients_str) if recipients: subject = f"[TempSpec Notice] Spec '{spec.spec_code}' was terminated early" body = f"""

Hello,

Temp spec {spec.spec_code} - {spec.title} has been terminated.

Termination date: {spec.end_date.strftime('%Y-%m-%d')}

Applicant: {spec.applicant}

Reason: {reason}

Please sign in to the system for additional details.

This notification was sent automatically. Please do not reply.

""" send_email(recipients, subject, body) current_app.logger.info(f"Sent termination notification for spec {spec.spec_code} to {len(recipients)} recipients.") # --- End of Dynamic Email Notification --- db.session.commit() flash(f"Spec '{spec.spec_code}' has been terminated early.", 'warning') return redirect(url_for('temp_spec.spec_list')) return render_template('terminate_spec.html', spec=spec, saved_emails=spec.notification_emails) @temp_spec_bp.route('/download_initial_word/') @login_required def download_initial_word(spec_id): spec = TempSpec.query.get_or_404(spec_id) if current_user.role not in ['editor', 'admin']: flash('You do not have permission to download the Word document.', 'danger') abort(403) generated_folder = os.path.join(current_app.static_folder, 'generated') word_path = os.path.join(generated_folder, f"{spec.spec_code}.docx") if not os.path.exists(word_path): flash('The original Word document cannot be found. It might have been moved or deleted.', 'danger') return redirect(url_for('temp_spec.spec_list')) return send_file(word_path, as_attachment=True) @temp_spec_bp.route('/download_signed/') @login_required def download_signed_pdf(spec_id): latest_upload = Upload.query.filter_by(temp_spec_id=spec_id).order_by(Upload.upload_time.desc()).first() if not latest_upload: flash('No signed files have been uploaded yet.', 'danger') return redirect(url_for('temp_spec.spec_list')) upload_folder = os.path.join(BASE_DIR, current_app.config['UPLOAD_FOLDER']) return send_file(os.path.join(upload_folder, latest_upload.filename), as_attachment=True) @temp_spec_bp.route('/extend/', methods=['GET', 'POST']) @editor_or_admin_required def extend_spec(spec_id): spec = TempSpec.query.get_or_404(spec_id) if spec.extension_count >= 2: flash('This temporary specification has reached the extension limit (2 times) and the total 90-day duration. Extension is no longer allowed.', 'danger') return redirect(url_for('temp_spec.spec_list')) if request.method == 'POST': new_end_date_str = request.form.get('new_end_date') uploaded_file = request.files.get('new_file') if not uploaded_file or uploaded_file.filename == '': flash('Please upload the supporting PDF before extending.', 'danger') return redirect(url_for('temp_spec.extend_spec', spec_id=spec.id)) if not new_end_date_str: flash('Please choose a new end date.', 'danger') return redirect(url_for('temp_spec.extend_spec', spec_id=spec.id)) spec.end_date = datetime.strptime(new_end_date_str, '%Y-%m-%d').date() spec.extension_count += 1 spec.status = 'active' filename = secure_filename(f"{spec.spec_code}_extension_{spec.extension_count}_{taiwan_now().strftime('%Y%m%d')}.pdf") upload_folder = os.path.join(BASE_DIR, current_app.config['UPLOAD_FOLDER']) os.makedirs(upload_folder, exist_ok=True) file_path = os.path.join(upload_folder, filename) uploaded_file.save(file_path) new_upload = Upload( temp_spec_id=spec.id, filename=filename, upload_time=taiwan_now() ) db.session.add(new_upload) details = f"Extended end date to {spec.end_date.strftime('%Y-%m-%d')}" details += f", uploaded file '{new_upload.filename}'" add_history_log(spec.id, 'extend', details) # --- Start of Dynamic Email Notification --- recipients_str = request.form.get('recipients') if not recipients_str and spec.notification_emails: recipients_str = spec.notification_emails if recipients_str: spec.notification_emails = recipients_str.strip() if recipients_str: recipients = process_recipients(recipients_str) if recipients: subject = f"[TempSpec Notice] Spec '{spec.spec_code}' was extended" body = f"""

Hello,

Temp spec {spec.spec_code} - {spec.title} has been extended.

New end date: {spec.end_date.strftime('%Y-%m-%d')}

Applicant: {spec.applicant}

Please sign in to the system for additional details.

This notification was sent automatically. Please do not reply.

""" send_email(recipients, subject, body) current_app.logger.info(f"Sent extension notification for spec {spec.spec_code} to {len(recipients)} recipients.") # --- End of Dynamic Email Notification --- db.session.commit() flash(f"Spec '{spec.spec_code}' was extended successfully.", 'success') return redirect(url_for('temp_spec.spec_list')) default_new_end_date = spec.end_date + timedelta(days=30) return render_template('extend_spec.html', spec=spec, default_new_end_date=default_new_end_date, saved_emails=spec.notification_emails) @temp_spec_bp.route('/history/') @login_required def spec_history(spec_id): spec = TempSpec.query.get_or_404(spec_id) history = SpecHistory.query.filter_by(spec_id=spec_id).order_by(SpecHistory.timestamp.desc()).all() return render_template('spec_history.html', spec=spec, history=history) @temp_spec_bp.route('/delete/', methods=['POST']) @admin_required def delete_spec(spec_id): spec = TempSpec.query.get_or_404(spec_id) spec_code = spec.spec_code files_to_delete = [] generated_folder = os.path.join(current_app.static_folder, 'generated') files_to_delete.append(os.path.join(generated_folder, f"{spec.spec_code}.docx")) upload_folder = os.path.join(BASE_DIR, current_app.config['UPLOAD_FOLDER']) for upload_record in spec.uploads: files_to_delete.append(os.path.join(upload_folder, upload_record.filename)) for f_path in files_to_delete: try: if os.path.exists(f_path): os.remove(f_path) except Exception as e: current_app.logger.error(f"Failed to delete file: {f_path}, reason: {e}") db.session.delete(spec) db.session.commit() flash(f"Spec '{spec_code}' and related files were deleted successfully.", 'success') return redirect(url_for('temp_spec.spec_list'))