Files
ver1_ext/app.py
2025-07-25 15:47:50 +08:00

189 lines
6.7 KiB
Python

from flask import Flask, render_template, request, redirect, url_for, jsonify, send_from_directory
import mysql.connector
import csv
import io
import os
def get_connection_details():
details = {}
base_dir = os.path.dirname(os.path.abspath(__file__))
settings_file = os.path.join(base_dir, 'setting.txt')
with open(settings_file, 'r') as f:
for line in f:
key, value = line.strip().split(':', 1)
details[key.strip()] = value.strip()
return details
app = Flask(__name__)
def get_db_connection():
config = get_connection_details()
conn = mysql.connector.connect(
host=config.get('Host'),
port=config.get('Port'),
user=config.get('Username'),
password=config.get('Password'),
database=config.get('Database')
)
return conn
@app.route('/')
def index():
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
search_query = request.args.get('search', '')
if search_query:
query = ("SELECT plant, department, csv_id AS name, position, extension, id FROM EXT WHERE plant LIKE %s OR department LIKE %s OR csv_id LIKE %s OR position LIKE %s OR extension LIKE %s")
search_term = f"%{search_query}%"
cursor.execute(query, (search_term, search_term, search_term, search_term, search_term))
else:
cursor.execute("SELECT plant, department, csv_id AS name, position, extension, id FROM EXT")
data = cursor.fetchall()
conn.close()
return render_template('index.html', data=data, search_query=search_query)
@app.route('/batch_upload', methods=['POST'])
def batch_upload():
if 'csv_file' not in request.files:
return redirect(url_for('index'))
file = request.files['csv_file']
if file.filename == '':
return redirect(url_for('index'))
if file:
conn = get_db_connection()
cursor = conn.cursor()
try:
# Read the CSV file content
stream = io.StringIO(file.stream.read().decode('Big5', errors='ignore'))
csv_reader = csv.reader(stream)
next(csv_reader, None) # Skip header row
for i, row in enumerate(csv_reader):
if len(row) < 5: # Ensure row has enough columns
print(f"Skipping malformed row {i+2}: {row}")
continue
plant = row[0]
department = row[1]
csv_id = row[2]
position = row[3]
extension = row[4]
# Check if record exists based on plant and csv_id
cursor.execute("SELECT id FROM EXT WHERE plant = %s AND csv_id = %s", (plant, csv_id))
existing_record = cursor.fetchone()
if existing_record:
# Update existing record
update_query = ("UPDATE EXT SET department=%s, position=%s, extension=%s WHERE id=%s")
cursor.execute(update_query, (department, position, extension, existing_record[0]))
else:
# Insert new record
insert_query = ("INSERT INTO EXT (plant, department, csv_id, position, extension) VALUES (%s, %s, %s, %s, %s)")
cursor.execute(insert_query, (plant, department, csv_id, position, extension))
conn.commit()
return jsonify({"status": "success"})
except Exception as e:
conn.rollback()
return jsonify({"status": "error", "message": str(e)}), 500
finally:
conn.close()
return jsonify({"status": "error", "message": "No file uploaded or file is empty."}), 400
@app.route('/validate_csv_data', methods=['POST'])
def validate_csv_data():
data = request.get_json()
validated_rows = []
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
for row_data in data:
plant = row_data.get('plant')
csv_id = row_data.get('csv_id')
department = row_data.get('department')
position = row_data.get('position')
extension = row_data.get('extension')
cursor.execute("SELECT * FROM EXT WHERE plant = %s AND csv_id = %s", (plant, csv_id))
existing_record = cursor.fetchone()
if existing_record:
diff = {}
status = "no_change"
if existing_record['department'] != department:
diff['department'] = {'old': existing_record['department'], 'new': department}
status = "update"
if existing_record['position'] != position:
diff['position'] = {'old': existing_record['position'], 'new': position}
status = "update"
if existing_record['extension'] != extension:
diff['extension'] = {'old': existing_record['extension'], 'new': extension}
status = "update"
validated_rows.append({
'original_data': row_data,
'status': status,
'diff': diff
})
else:
validated_rows.append({
'original_data': row_data,
'status': "new",
'diff': {}
})
conn.close()
return jsonify(validated_rows)
@app.route('/download_template')
def download_template():
return send_from_directory('samples', 'template.csv', as_attachment=True)
@app.route('/add', methods=['POST'])
def add_extension():
conn = get_db_connection()
cursor = conn.cursor()
data = request.get_json()
add_query = ("INSERT INTO EXT "
"(plant, department, csv_id, position, extension) "
"VALUES (%s, %s, %s, %s, %s)")
data_tuple = (data['plant'], data['department'], data['name'], data['position'], data['extension'])
cursor.execute(add_query, data_tuple)
conn.commit()
conn.close()
return jsonify({"status": "success"})
@app.route('/update/<int:id>', methods=['POST'])
def update_extension(id):
conn = get_db_connection()
cursor = conn.cursor()
data = request.get_json()
update_query = ("UPDATE EXT SET plant=%s, department=%s, csv_id=%s, position=%s, extension=%s WHERE id=%s")
data_tuple = (data['plant'], data['department'], data['name'], data['position'], data['extension'], id)
cursor.execute(update_query, data_tuple)
conn.commit()
conn.close()
return jsonify({"status": "success"})
@app.route('/delete/<int:id>', methods=['POST'])
def delete_extension(id):
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM EXT WHERE id = %s", (id,))
conn.commit()
conn.close()
return jsonify({"status": "success"})
if __name__ == '__main__':
app.run(debug=True)