first_project
This commit is contained in:
188
app.py
Normal file
188
app.py
Normal file
@@ -0,0 +1,188 @@
|
||||
|
||||
from flask import Flask, render_template, request, redirect, url_for, jsonify, send_from_directory
|
||||
import mysql.connector
|
||||
import csv
|
||||
import io
|
||||
import os
|
||||
|
||||
def get_connection_details():
|
||||
details = {}
|
||||
base_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
settings_file = os.path.join(base_dir, 'setting.txt')
|
||||
with open(settings_file, 'r') as f:
|
||||
for line in f:
|
||||
key, value = line.strip().split(':', 1)
|
||||
details[key.strip()] = value.strip()
|
||||
return details
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
def get_db_connection():
|
||||
config = get_connection_details()
|
||||
conn = mysql.connector.connect(
|
||||
host=config.get('Host'),
|
||||
port=config.get('Port'),
|
||||
user=config.get('Username'),
|
||||
password=config.get('Password'),
|
||||
database=config.get('Database')
|
||||
)
|
||||
return conn
|
||||
|
||||
|
||||
@app.route('/')
|
||||
def index():
|
||||
conn = get_db_connection()
|
||||
cursor = conn.cursor(dictionary=True)
|
||||
search_query = request.args.get('search', '')
|
||||
if search_query:
|
||||
query = ("SELECT plant, department, csv_id AS name, position, extension, id FROM EXT WHERE plant LIKE %s OR department LIKE %s OR csv_id LIKE %s OR position LIKE %s OR extension LIKE %s")
|
||||
search_term = f"%{search_query}%"
|
||||
cursor.execute(query, (search_term, search_term, search_term, search_term, search_term))
|
||||
else:
|
||||
cursor.execute("SELECT plant, department, csv_id AS name, position, extension, id FROM EXT")
|
||||
data = cursor.fetchall()
|
||||
conn.close()
|
||||
return render_template('index.html', data=data, search_query=search_query)
|
||||
|
||||
|
||||
@app.route('/batch_upload', methods=['POST'])
|
||||
def batch_upload():
|
||||
if 'csv_file' not in request.files:
|
||||
return redirect(url_for('index'))
|
||||
|
||||
file = request.files['csv_file']
|
||||
if file.filename == '':
|
||||
return redirect(url_for('index'))
|
||||
|
||||
if file:
|
||||
conn = get_db_connection()
|
||||
cursor = conn.cursor()
|
||||
|
||||
try:
|
||||
# Read the CSV file content
|
||||
stream = io.StringIO(file.stream.read().decode('Big5', errors='ignore'))
|
||||
csv_reader = csv.reader(stream)
|
||||
|
||||
next(csv_reader, None) # Skip header row
|
||||
|
||||
for i, row in enumerate(csv_reader):
|
||||
if len(row) < 5: # Ensure row has enough columns
|
||||
print(f"Skipping malformed row {i+2}: {row}")
|
||||
continue
|
||||
|
||||
plant = row[0]
|
||||
department = row[1]
|
||||
csv_id = row[2]
|
||||
position = row[3]
|
||||
extension = row[4]
|
||||
|
||||
# Check if record exists based on plant and csv_id
|
||||
cursor.execute("SELECT id FROM EXT WHERE plant = %s AND csv_id = %s", (plant, csv_id))
|
||||
existing_record = cursor.fetchone()
|
||||
|
||||
if existing_record:
|
||||
# Update existing record
|
||||
update_query = ("UPDATE EXT SET department=%s, position=%s, extension=%s WHERE id=%s")
|
||||
cursor.execute(update_query, (department, position, extension, existing_record[0]))
|
||||
else:
|
||||
# Insert new record
|
||||
insert_query = ("INSERT INTO EXT (plant, department, csv_id, position, extension) VALUES (%s, %s, %s, %s, %s)")
|
||||
cursor.execute(insert_query, (plant, department, csv_id, position, extension))
|
||||
|
||||
conn.commit()
|
||||
return jsonify({"status": "success"})
|
||||
except Exception as e:
|
||||
conn.rollback()
|
||||
return jsonify({"status": "error", "message": str(e)}), 500
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
return jsonify({"status": "error", "message": "No file uploaded or file is empty."}), 400
|
||||
|
||||
@app.route('/validate_csv_data', methods=['POST'])
|
||||
def validate_csv_data():
|
||||
data = request.get_json()
|
||||
validated_rows = []
|
||||
conn = get_db_connection()
|
||||
cursor = conn.cursor(dictionary=True)
|
||||
|
||||
for row_data in data:
|
||||
plant = row_data.get('plant')
|
||||
csv_id = row_data.get('csv_id')
|
||||
department = row_data.get('department')
|
||||
position = row_data.get('position')
|
||||
extension = row_data.get('extension')
|
||||
|
||||
cursor.execute("SELECT * FROM EXT WHERE plant = %s AND csv_id = %s", (plant, csv_id))
|
||||
existing_record = cursor.fetchone()
|
||||
|
||||
if existing_record:
|
||||
diff = {}
|
||||
status = "no_change"
|
||||
if existing_record['department'] != department:
|
||||
diff['department'] = {'old': existing_record['department'], 'new': department}
|
||||
status = "update"
|
||||
if existing_record['position'] != position:
|
||||
diff['position'] = {'old': existing_record['position'], 'new': position}
|
||||
status = "update"
|
||||
if existing_record['extension'] != extension:
|
||||
diff['extension'] = {'old': existing_record['extension'], 'new': extension}
|
||||
status = "update"
|
||||
|
||||
validated_rows.append({
|
||||
'original_data': row_data,
|
||||
'status': status,
|
||||
'diff': diff
|
||||
})
|
||||
else:
|
||||
validated_rows.append({
|
||||
'original_data': row_data,
|
||||
'status': "new",
|
||||
'diff': {}
|
||||
})
|
||||
|
||||
conn.close()
|
||||
return jsonify(validated_rows)
|
||||
|
||||
@app.route('/download_template')
|
||||
def download_template():
|
||||
return send_from_directory('samples', 'template.csv', as_attachment=True)
|
||||
|
||||
|
||||
@app.route('/add', methods=['POST'])
|
||||
def add_extension():
|
||||
conn = get_db_connection()
|
||||
cursor = conn.cursor()
|
||||
data = request.get_json()
|
||||
add_query = ("INSERT INTO EXT "
|
||||
"(plant, department, csv_id, position, extension) "
|
||||
"VALUES (%s, %s, %s, %s, %s)")
|
||||
data_tuple = (data['plant'], data['department'], data['name'], data['position'], data['extension'])
|
||||
cursor.execute(add_query, data_tuple)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
return jsonify({"status": "success"})
|
||||
|
||||
@app.route('/update/<int:id>', methods=['POST'])
|
||||
def update_extension(id):
|
||||
conn = get_db_connection()
|
||||
cursor = conn.cursor()
|
||||
data = request.get_json()
|
||||
update_query = ("UPDATE EXT SET plant=%s, department=%s, csv_id=%s, position=%s, extension=%s WHERE id=%s")
|
||||
data_tuple = (data['plant'], data['department'], data['name'], data['position'], data['extension'], id)
|
||||
cursor.execute(update_query, data_tuple)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
return jsonify({"status": "success"})
|
||||
|
||||
@app.route('/delete/<int:id>', methods=['POST'])
|
||||
def delete_extension(id):
|
||||
conn = get_db_connection()
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("DELETE FROM EXT WHERE id = %s", (id,))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
return jsonify({"status": "success"})
|
||||
|
||||
if __name__ == '__main__':
|
||||
app.run(debug=True)
|
Reference in New Issue
Block a user