189 lines
6.7 KiB
Python
189 lines
6.7 KiB
Python
|
|
from flask import Flask, render_template, request, redirect, url_for, jsonify, send_from_directory
|
|
import mysql.connector
|
|
import csv
|
|
import io
|
|
import os
|
|
|
|
def get_connection_details():
|
|
details = {}
|
|
base_dir = os.path.dirname(os.path.abspath(__file__))
|
|
settings_file = os.path.join(base_dir, 'setting.txt')
|
|
with open(settings_file, 'r') as f:
|
|
for line in f:
|
|
key, value = line.strip().split(':', 1)
|
|
details[key.strip()] = value.strip()
|
|
return details
|
|
|
|
app = Flask(__name__)
|
|
|
|
def get_db_connection():
|
|
config = get_connection_details()
|
|
conn = mysql.connector.connect(
|
|
host=config.get('Host'),
|
|
port=config.get('Port'),
|
|
user=config.get('Username'),
|
|
password=config.get('Password'),
|
|
database=config.get('Database')
|
|
)
|
|
return conn
|
|
|
|
|
|
@app.route('/')
|
|
def index():
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
search_query = request.args.get('search', '')
|
|
if search_query:
|
|
query = ("SELECT plant, department, csv_id AS name, position, extension, id FROM EXT WHERE plant LIKE %s OR department LIKE %s OR csv_id LIKE %s OR position LIKE %s OR extension LIKE %s")
|
|
search_term = f"%{search_query}%"
|
|
cursor.execute(query, (search_term, search_term, search_term, search_term, search_term))
|
|
else:
|
|
cursor.execute("SELECT plant, department, csv_id AS name, position, extension, id FROM EXT")
|
|
data = cursor.fetchall()
|
|
conn.close()
|
|
return render_template('index.html', data=data, search_query=search_query)
|
|
|
|
|
|
@app.route('/batch_upload', methods=['POST'])
|
|
def batch_upload():
|
|
if 'csv_file' not in request.files:
|
|
return redirect(url_for('index'))
|
|
|
|
file = request.files['csv_file']
|
|
if file.filename == '':
|
|
return redirect(url_for('index'))
|
|
|
|
if file:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
# Read the CSV file content
|
|
stream = io.StringIO(file.stream.read().decode('Big5', errors='ignore'))
|
|
csv_reader = csv.reader(stream)
|
|
|
|
next(csv_reader, None) # Skip header row
|
|
|
|
for i, row in enumerate(csv_reader):
|
|
if len(row) < 5: # Ensure row has enough columns
|
|
print(f"Skipping malformed row {i+2}: {row}")
|
|
continue
|
|
|
|
plant = row[0]
|
|
department = row[1]
|
|
csv_id = row[2]
|
|
position = row[3]
|
|
extension = row[4]
|
|
|
|
# Check if record exists based on plant and csv_id
|
|
cursor.execute("SELECT id FROM EXT WHERE plant = %s AND csv_id = %s", (plant, csv_id))
|
|
existing_record = cursor.fetchone()
|
|
|
|
if existing_record:
|
|
# Update existing record
|
|
update_query = ("UPDATE EXT SET department=%s, position=%s, extension=%s WHERE id=%s")
|
|
cursor.execute(update_query, (department, position, extension, existing_record[0]))
|
|
else:
|
|
# Insert new record
|
|
insert_query = ("INSERT INTO EXT (plant, department, csv_id, position, extension) VALUES (%s, %s, %s, %s, %s)")
|
|
cursor.execute(insert_query, (plant, department, csv_id, position, extension))
|
|
|
|
conn.commit()
|
|
return jsonify({"status": "success"})
|
|
except Exception as e:
|
|
conn.rollback()
|
|
return jsonify({"status": "error", "message": str(e)}), 500
|
|
finally:
|
|
conn.close()
|
|
|
|
return jsonify({"status": "error", "message": "No file uploaded or file is empty."}), 400
|
|
|
|
@app.route('/validate_csv_data', methods=['POST'])
|
|
def validate_csv_data():
|
|
data = request.get_json()
|
|
validated_rows = []
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
for row_data in data:
|
|
plant = row_data.get('plant')
|
|
csv_id = row_data.get('csv_id')
|
|
department = row_data.get('department')
|
|
position = row_data.get('position')
|
|
extension = row_data.get('extension')
|
|
|
|
cursor.execute("SELECT * FROM EXT WHERE plant = %s AND csv_id = %s", (plant, csv_id))
|
|
existing_record = cursor.fetchone()
|
|
|
|
if existing_record:
|
|
diff = {}
|
|
status = "no_change"
|
|
if existing_record['department'] != department:
|
|
diff['department'] = {'old': existing_record['department'], 'new': department}
|
|
status = "update"
|
|
if existing_record['position'] != position:
|
|
diff['position'] = {'old': existing_record['position'], 'new': position}
|
|
status = "update"
|
|
if existing_record['extension'] != extension:
|
|
diff['extension'] = {'old': existing_record['extension'], 'new': extension}
|
|
status = "update"
|
|
|
|
validated_rows.append({
|
|
'original_data': row_data,
|
|
'status': status,
|
|
'diff': diff
|
|
})
|
|
else:
|
|
validated_rows.append({
|
|
'original_data': row_data,
|
|
'status': "new",
|
|
'diff': {}
|
|
})
|
|
|
|
conn.close()
|
|
return jsonify(validated_rows)
|
|
|
|
@app.route('/download_template')
|
|
def download_template():
|
|
return send_from_directory('samples', 'template.csv', as_attachment=True)
|
|
|
|
|
|
@app.route('/add', methods=['POST'])
|
|
def add_extension():
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
data = request.get_json()
|
|
add_query = ("INSERT INTO EXT "
|
|
"(plant, department, csv_id, position, extension) "
|
|
"VALUES (%s, %s, %s, %s, %s)")
|
|
data_tuple = (data['plant'], data['department'], data['name'], data['position'], data['extension'])
|
|
cursor.execute(add_query, data_tuple)
|
|
conn.commit()
|
|
conn.close()
|
|
return jsonify({"status": "success"})
|
|
|
|
@app.route('/update/<int:id>', methods=['POST'])
|
|
def update_extension(id):
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
data = request.get_json()
|
|
update_query = ("UPDATE EXT SET plant=%s, department=%s, csv_id=%s, position=%s, extension=%s WHERE id=%s")
|
|
data_tuple = (data['plant'], data['department'], data['name'], data['position'], data['extension'], id)
|
|
cursor.execute(update_query, data_tuple)
|
|
conn.commit()
|
|
conn.close()
|
|
return jsonify({"status": "success"})
|
|
|
|
@app.route('/delete/<int:id>', methods=['POST'])
|
|
def delete_extension(id):
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("DELETE FROM EXT WHERE id = %s", (id,))
|
|
conn.commit()
|
|
conn.close()
|
|
return jsonify({"status": "success"})
|
|
|
|
if __name__ == '__main__':
|
|
app.run(debug=True)
|