from flask import Flask, render_template, request, redirect, url_for, jsonify, send_from_directory import mysql.connector import csv import io import os def get_connection_details(): details = {} base_dir = os.path.dirname(os.path.abspath(__file__)) settings_file = os.path.join(base_dir, 'setting.txt') with open(settings_file, 'r') as f: for line in f: key, value = line.strip().split(':', 1) details[key.strip()] = value.strip() return details app = Flask(__name__) def get_db_connection(): config = get_connection_details() conn = mysql.connector.connect( host=config.get('Host'), port=config.get('Port'), user=config.get('Username'), password=config.get('Password'), database=config.get('Database') ) return conn @app.route('/') def index(): conn = get_db_connection() cursor = conn.cursor(dictionary=True) search_query = request.args.get('search', '') if search_query: query = ("SELECT plant, department, csv_id AS name, position, extension, id FROM EXT WHERE plant LIKE %s OR department LIKE %s OR csv_id LIKE %s OR position LIKE %s OR extension LIKE %s") search_term = f"%{search_query}%" cursor.execute(query, (search_term, search_term, search_term, search_term, search_term)) else: cursor.execute("SELECT plant, department, csv_id AS name, position, extension, id FROM EXT") data = cursor.fetchall() conn.close() return render_template('index.html', data=data, search_query=search_query) @app.route('/batch_upload', methods=['POST']) def batch_upload(): if 'csv_file' not in request.files: return redirect(url_for('index')) file = request.files['csv_file'] if file.filename == '': return redirect(url_for('index')) if file: conn = get_db_connection() cursor = conn.cursor() try: # Read the CSV file content stream = io.StringIO(file.stream.read().decode('Big5', errors='ignore')) csv_reader = csv.reader(stream) next(csv_reader, None) # Skip header row for i, row in enumerate(csv_reader): if len(row) < 5: # Ensure row has enough columns print(f"Skipping malformed row {i+2}: {row}") continue plant = row[0] department = row[1] csv_id = row[2] position = row[3] extension = row[4] # Check if record exists based on plant and csv_id cursor.execute("SELECT id FROM EXT WHERE plant = %s AND csv_id = %s", (plant, csv_id)) existing_record = cursor.fetchone() if existing_record: # Update existing record update_query = ("UPDATE EXT SET department=%s, position=%s, extension=%s WHERE id=%s") cursor.execute(update_query, (department, position, extension, existing_record[0])) else: # Insert new record insert_query = ("INSERT INTO EXT (plant, department, csv_id, position, extension) VALUES (%s, %s, %s, %s, %s)") cursor.execute(insert_query, (plant, department, csv_id, position, extension)) conn.commit() return jsonify({"status": "success"}) except Exception as e: conn.rollback() return jsonify({"status": "error", "message": str(e)}), 500 finally: conn.close() return jsonify({"status": "error", "message": "No file uploaded or file is empty."}), 400 @app.route('/validate_csv_data', methods=['POST']) def validate_csv_data(): data = request.get_json() validated_rows = [] conn = get_db_connection() cursor = conn.cursor(dictionary=True) for row_data in data: plant = row_data.get('plant') csv_id = row_data.get('csv_id') department = row_data.get('department') position = row_data.get('position') extension = row_data.get('extension') cursor.execute("SELECT * FROM EXT WHERE plant = %s AND csv_id = %s", (plant, csv_id)) existing_record = cursor.fetchone() if existing_record: diff = {} status = "no_change" if existing_record['department'] != department: diff['department'] = {'old': existing_record['department'], 'new': department} status = "update" if existing_record['position'] != position: diff['position'] = {'old': existing_record['position'], 'new': position} status = "update" if existing_record['extension'] != extension: diff['extension'] = {'old': existing_record['extension'], 'new': extension} status = "update" validated_rows.append({ 'original_data': row_data, 'status': status, 'diff': diff }) else: validated_rows.append({ 'original_data': row_data, 'status': "new", 'diff': {} }) conn.close() return jsonify(validated_rows) @app.route('/download_template') def download_template(): return send_from_directory('samples', 'template.csv', as_attachment=True) @app.route('/add', methods=['POST']) def add_extension(): conn = get_db_connection() cursor = conn.cursor() data = request.get_json() add_query = ("INSERT INTO EXT " "(plant, department, csv_id, position, extension) " "VALUES (%s, %s, %s, %s, %s)") data_tuple = (data['plant'], data['department'], data['name'], data['position'], data['extension']) cursor.execute(add_query, data_tuple) conn.commit() conn.close() return jsonify({"status": "success"}) @app.route('/update/', methods=['POST']) def update_extension(id): conn = get_db_connection() cursor = conn.cursor() data = request.get_json() update_query = ("UPDATE EXT SET plant=%s, department=%s, csv_id=%s, position=%s, extension=%s WHERE id=%s") data_tuple = (data['plant'], data['department'], data['name'], data['position'], data['extension'], id) cursor.execute(update_query, data_tuple) conn.commit() conn.close() return jsonify({"status": "success"}) @app.route('/delete/', methods=['POST']) def delete_extension(id): conn = get_db_connection() cursor = conn.cursor() cursor.execute("DELETE FROM EXT WHERE id = %s", (id,)) conn.commit() conn.close() return jsonify({"status": "success"}) if __name__ == '__main__': app.run(debug=True)