上傳檔案到「/」

-- 指定專案資料夾
cd /employee_votes

-- gitea 初始化
git init 

-- 建立註解
git add . 
git commit -m "Initial commit" 

-- 切換到 main 分支
git branch -M main

-- 上傳檔案到 gitea
git remote add origin https://github.com/91771/<REPO>.git 
git push -u origin main
This commit is contained in:
2025-09-17 15:18:20 +08:00
commit 314474a682
5 changed files with 1251 additions and 0 deletions

6
DB_connection.txt Normal file
View File

@@ -0,0 +1,6 @@
資料庫資訊:
DB_HOST = mysql.theaken.com
DB_PORT = 33306
DB_NAME = db_A019
DB_USER = A019
DB_PASSWORD = 9wvKEkxBzVca

391
api_server.py Normal file
View File

@@ -0,0 +1,391 @@
from flask import Flask, request, jsonify, render_template
from flask_cors import CORS
import mysql.connector
from mysql.connector import Error
from datetime import datetime
import re
app = Flask(__name__)
CORS(app) # 啟用CORS允許本機前端訪問
# 資料庫連接設定 - 從DB_connection.txt獲取
DB_CONFIG = {
'host': 'mysql.theaken.com',
'database': 'db_A019',
'user': 'A019',
'password': '9wvKEkxBzVca',
'port': 33306
}
def get_db_connection():
"""建立資料庫連接"""
try:
connection = mysql.connector.connect(**DB_CONFIG)
return connection
except Error as e:
print(f"資料庫連接錯誤: {e}")
return None
def validate_email(email):
"""驗證email格式"""
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return re.match(pattern, email) is not None
def validate_age(age):
"""驗證年齡範圍"""
try:
age_int = int(age)
return 1 <= age_int <= 120
except ValueError:
return False
@app.route('/v1/menu_items', methods=['GET'])
def get_menu_items():
"""
GET /v1/menu_items - 查詢menu_items資料表
支援查詢參數: ?main_course, ?side_dish, ?addon, ?Order_Date, ?is_active
"""
try:
# 獲取查詢參數
main_course = request.args.get('main_course')
side_dish = request.args.get('side_dish')
addon = request.args.get('addon')
order_date = request.args.get('Order_Date')
is_active = request.args.get('is_active')
connection = get_db_connection()
if connection is None:
return jsonify({
'status': 'error',
'code': 500,
'message': '無法連接資料庫'
}), 500
cursor = connection.cursor(dictionary=True)
# 建立查詢條件
query = "SELECT * FROM menu_items WHERE 1=1"
params = []
if main_course:
query += " AND main_course LIKE %s"
params.append(f"%{main_course}%")
if side_dish:
query += " AND side_dish LIKE %s"
params.append(f"%{side_dish}%")
if addon:
query += " AND addon LIKE %s"
params.append(f"%{addon}%")
if order_date:
query += " AND Order_Date = %s"
params.append(order_date)
if is_active is not None:
query += " AND is_active = %s"
params.append(is_active.lower() in ['true', '1', 'yes'])
# 執行查詢
cursor.execute(query, params)
results = cursor.fetchall()
# 獲取總數用於meta資訊
count_query = "SELECT COUNT(*) as total FROM menu_items WHERE 1=1" + query.split('WHERE 1=1')[1]
cursor.execute(count_query, params)
total_count = cursor.fetchone()['total']
cursor.close()
connection.close()
return jsonify({
'status': 'success',
'code': 200,
'message': '查詢成功',
'data': results,
'meta': {
'total': total_count,
'page': 1,
'per_page': len(results),
'has_more': False
}
})
except Error as e:
return jsonify({
'status': 'error',
'code': 500,
'message': f'資料庫查詢錯誤: {str(e)}'
}), 500
@app.route('/v1/menu_items/<int:item_id>', methods=['GET'])
def get_menu_item(item_id):
"""GET /v1/menu_items/<id> - 根據ID獲取單個menu_item"""
try:
connection = get_db_connection()
if connection is None:
return jsonify({
'status': 'error',
'code': 500,
'message': '無法連接資料庫'
}), 500
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT * FROM menu_items WHERE Id = %s", (item_id,))
result = cursor.fetchone()
cursor.close()
connection.close()
if result is None:
return jsonify({
'status': 'error',
'code': 404,
'message': '找不到指定的menu_item'
}), 404
return jsonify({
'status': 'success',
'code': 200,
'message': '查詢成功',
'data': result
})
except Error as e:
return jsonify({
'status': 'error',
'code': 500,
'message': f'資料庫查詢錯誤: {str(e)}'
}), 500
@app.route('/v1/menu_items', methods=['POST'])
def create_menu_item():
"""POST /v1/menu_items - 創建新的menu_item"""
try:
data = request.get_json()
# 驗證必要欄位
required_fields = ['name', 'email', 'age']
for field in required_fields:
if field not in data or not data[field]:
return jsonify({
'status': 'error',
'code': 400,
'message': f'缺少必要欄位: {field}'
}), 400
# 驗證email格式
if not validate_email(data['email']):
return jsonify({
'status': 'error',
'code': 400,
'message': 'email格式不正確'
}), 400
# 驗證年齡範圍
if not validate_age(data['age']):
return jsonify({
'status': 'error',
'code': 400,
'message': '年齡必須是1-120之間的數字'
}), 400
connection = get_db_connection()
if connection is None:
return jsonify({
'status': 'error',
'code': 500,
'message': '無法連接資料庫'
}), 500
cursor = connection.cursor()
# 使用參數化查詢避免SQL注入
query = """
INSERT INTO menu_items (main_course, side_dish, addon, Order_Date, is_active)
VALUES (%s, %s, %s, %s, %s)
"""
# 這裡使用範例資料,您可以根據實際需求調整
params = (
data.get('main_course', '預設主餐'),
data.get('side_dish', '預設副餐'),
data.get('addon', '預設湯品'),
datetime.now().strftime('%Y-%m-%d'),
True
)
cursor.execute(query, params)
connection.commit()
# 獲取新創建的資料ID
new_id = cursor.lastrowid
cursor.close()
connection.close()
return jsonify({
'status': 'success',
'code': 201,
'message': '創建成功',
'data': {
'id': new_id,
'main_course': params[0],
'side_dish': params[1],
'addon': params[2],
'Order_Date': params[3],
'is_active': params[4]
}
}), 201
except Error as e:
return jsonify({
'status': 'error',
'code': 500,
'message': f'資料庫操作錯誤: {str(e)}'
}), 500
@app.route('/v1/menu_items/<int:item_id>', methods=['PATCH'])
def update_menu_item(item_id):
"""PATCH /v1/menu_items/<id> - 更新menu_item的任一欄位"""
try:
data = request.get_json()
# 檢查是否有可更新的欄位
allowed_fields = ['main_course', 'side_dish', 'addon', 'Order_Date', 'is_active']
update_fields = {}
for field in allowed_fields:
if field in data:
update_fields[field] = data[field]
if not update_fields:
return jsonify({
'status': 'error',
'code': 400,
'message': '沒有提供可更新的欄位'
}), 400
connection = get_db_connection()
if connection is None:
return jsonify({
'status': 'error',
'code': 500,
'message': '無法連接資料庫'
}), 500
cursor = connection.cursor(dictionary=True)
# 檢查資料是否存在
cursor.execute("SELECT * FROM menu_items WHERE Id = %s", (item_id,))
if cursor.fetchone() is None:
cursor.close()
connection.close()
return jsonify({
'status': 'error',
'code': 404,
'message': '找不到指定的menu_item'
}), 404
# 建立更新查詢
set_clause = ", ".join([f"{field} = %s" for field in update_fields.keys()])
query = f"UPDATE menu_items SET {set_clause} WHERE Id = %s"
params = list(update_fields.values())
params.append(item_id)
cursor.execute(query, params)
connection.commit()
# 獲取更新後的資料
cursor.execute("SELECT * FROM menu_items WHERE Id = %s", (item_id,))
updated_data = cursor.fetchone()
cursor.close()
connection.close()
return jsonify({
'status': 'success',
'code': 200,
'message': '更新成功',
'data': updated_data
})
except Error as e:
return jsonify({
'status': 'error',
'code': 500,
'message': f'資料庫操作錯誤: {str(e)}'
}), 500
@app.route('/v1/menu_items/<int:item_id>', methods=['DELETE'])
def delete_menu_item(item_id):
"""DELETE /v1/menu_items/<id> - 刪除menu_item"""
try:
connection = get_db_connection()
if connection is None:
return jsonify({
'status': 'error',
'code': 500,
'message': '無法連接資料庫'
}), 500
cursor = connection.cursor()
# 檢查資料是否存在
cursor.execute("SELECT * FROM menu_items WHERE Id = %s", (item_id,))
if cursor.fetchone() is None:
cursor.close()
connection.close()
return jsonify({
'status': 'error',
'code': 404,
'message': '找不到指定的menu_item'
}), 404
# 執行刪除
cursor.execute("DELETE FROM menu_items WHERE Id = %s", (item_id,))
connection.commit()
cursor.close()
connection.close()
return '', 204
except Error as e:
return jsonify({
'status': 'error',
'code': 500,
'message': f'資料庫操作錯誤: {str(e)}'
}), 500
@app.errorhandler(404)
def not_found(error):
return jsonify({
'status': 'error',
'code': 404,
'message': 'API端點不存在'
}), 404
@app.errorhandler(500)
def internal_error(error):
return jsonify({
'status': 'error',
'code': 500,
'message': '伺服器內部錯誤'
}), 500
@app.route('/menu-form')
def menu_form():
return render_template('menu_form.html')
if __name__ == '__main__':
print("啟動Flask API伺服器...")
print("可用端點:")
print(" GET /v1/menu_items")
print(" GET /v1/menu_items/<id>")
print(" POST /v1/menu_items")
print(" PATCH /v1/menu_items/<id>")
print(" DELETE /v1/menu_items/<id>")
app.run(debug=True, host='0.0.0.0', port=5000)

304
app_backup.py Normal file
View File

@@ -0,0 +1,304 @@
from flask import Flask, render_template, request, jsonify
import mysql.connector
import requests
from bs4 import BeautifulSoup
import json
from datetime import datetime
import re
import urllib3
# 禁用SSL警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
app = Flask(__name__)
# Database configuration from DB_connection.txt
DB_CONFIG = {
'host': 'mysql.theaken.com',
'port': 33306,
'database': 'db_A019',
'user': 'A019',
'password': '9wvKEkxBzVca'
}
def get_db_connection():
"""Establish database connection"""
try:
conn = mysql.connector.connect(**DB_CONFIG)
return conn
except mysql.connector.Error as e:
print(f"Database connection error: {e}")
return None
def create_menu_table():
"""Create menu_items table if not exists"""
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS menu_items (
Id INTEGER AUTO_INCREMENT PRIMARY KEY,
main_course VARCHAR(50) NOT NULL,
side_dish VARCHAR(50),
addon VARCHAR(50),
is_active BOOLEAN NOT NULL
)
""")
conn.commit()
print("Menu table created or already exists")
except mysql.connector.Error as e:
print(f"Error creating table: {e}")
finally:
conn.close()
def scrape_menu_data():
"""Scrape menu data from the target URL"""
url = "https://club.panjit.com.tw/back/menu/menu.php?1672970133&indexselectid=1"
try:
# Bypass SSL verification for the target website
response = requests.get(url, timeout=10, verify=False)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
# 根據實際網站結構解析菜單資料
menu_items = []
# 查找菜單表格或列表
menu_tables = soup.find_all('table')
menu_lists = soup.find_all(['ul', 'ol'])
# 如果找到表格結構
if menu_tables:
for table in menu_tables:
rows = table.find_all('tr')
for row in rows:
cells = row.find_all('td')
if len(cells) >= 2: # 至少有名稱和價格
name = cells[0].get_text(strip=True)
price_text = cells[1].get_text(strip=True)
# 提取價格數字
price = 0.0
try:
# 從文字中提取數字
price_match = re.search(r'\d+', price_text)
if price_match:
price = float(price_match.group())
except:
pass
if name and price > 0:
menu_items.append({
'name': name,
'category': '主餐',
'price': price,
'description': f'{name} - 美味餐點',
'image_url': f'/static/images/{name.replace(" ", "_").lower()}.jpg'
})
# 如果找到列表結構
elif menu_lists:
for menu_list in menu_lists:
items = menu_list.find_all('li')
for item in items:
text = item.get_text(strip=True)
if text and any(char.isdigit() for char in text):
# 嘗試解析項目名稱和價格
# 尋找價格模式
price_match = re.search(r'(\$|NT\$|\$)?\s*(\d+)', text)
if price_match:
price = float(price_match.group(2))
name = re.sub(r'(\$|NT\$|\$)?\s*\d+', '', text).strip()
if name and price > 0:
menu_items.append({
'name': name,
'category': '餐點',
'price': price,
'description': f'{name} - 精選美食',
'image_url': f'/static/images/{name.replace(" ", "_").lower()}.jpg'
})
# 如果以上方法都沒找到使用備用方案搜尋包含菜單文字的div或span
if not menu_items:
# 尋找可能包含菜單項目的元素
possible_menu_elements = soup.find_all(['div', 'span', 'p'])
for elem in possible_menu_elements:
text = elem.get_text(strip=True)
if text and len(text) > 3 and any(char.isdigit() for char in text):
price_match = re.search(r'(\$|NT\$|\$)?\s*(\d+)', text)
if price_match:
price = float(price_match.group(2))
name = re.sub(r'(\$|NT\$|\$)?\s*\d+.*', '', text).strip()
if name and price > 0 and len(name) > 1:
menu_items.append({
'name': name,
'category': '餐點',
'price': price,
'description': f'{name} - 餐廳推薦',
'image_url': f'/static/images/{name.replace(" ", "_").lower()}.jpg'
})
# 如果仍然沒有找到菜單項目,使用範例資料
if not menu_items:
menu_items = [
{
'name': '經典牛肉麵',
'category': '主食',
'price': 120.00,
'description': '傳統台灣牛肉麵,湯頭濃郁',
'image_url': '/static/images/beef_noodle.jpg'
},
{
'name': '雞腿飯',
'category': '主食',
'price': 95.00,
'description': '香煎雞腿配時蔬',
'image_url': '/static/images/chicken_rice.jpg'
},
{
'name': '蔬菜沙拉',
'category': '前菜',
'price': 65.00,
'description': '新鮮時蔬搭配特製醬料',
'image_url': '/static/images/salad.jpg'
},
{
'name': '紅燒獅子頭',
'category': '主餐',
'price': 150.00,
'description': '傳統江浙菜,肉質鮮嫩',
'image_url': '/static/images/lion_head.jpg'
},
{
'name': '炒青菜',
'category': '蔬菜',
'price': 60.00,
'description': '時令蔬菜清炒',
'image_url': '/static/images/vegetable.jpg'
}
]
return menu_items
except requests.RequestException as e:
print(f"Error scraping menu data: {e}")
# 返回範例資料作為備用
return [
{
'name': '經典牛肉麵',
'category': '主食',
'price': 120.00,
'description': '傳統台灣牛肉麵,湯頭濃郁',
'image_url': '/static/images/beef_noodle.jpg'
},
{
'name': '雞腿飯',
'category': '主食',
'price': 95.00,
'description': '香煎雞腿配時蔬',
'image_url': '/static/images/chicken_rice.jpg'
}
]
def insert_menu_data():
"""Insert sample menu items into database"""
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor()
# Clear existing data
cursor.execute("DELETE FROM menu_items")
# Insert sample data that matches the new table structure
sample_menu_items = [
('經典牛肉麵', '手工麵條', '酸菜湯', True),
('香煎雞腿飯', '時令蔬菜', '玉米濃湯', True),
('紅燒獅子頭', '白飯', '紫菜蛋花湯', True),
('蔬菜炒麵', None, '味噌湯', True),
('烤鮭魚排', '馬鈴薯泥', '南瓜湯', True)
]
insert_query = """
INSERT INTO menu_items (main_course, side_dish, addon, is_active)
VALUES (%s, %s, %s, %s)
"""
for item in sample_menu_items:
cursor.execute(insert_query, item)
conn.commit()
print(f"Inserted {len(sample_menu_items)} menu items")
except mysql.connector.Error as e:
print(f"Error inserting menu data: {e}")
finally:
conn.close()
@app.route('/')
def index():
"""Main page - display menu items"""
conn = get_db_connection()
menu_items = []
if conn:
try:
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM menu_items ORDER BY category, name")
menu_items = cursor.fetchall()
except mysql.connector.Error as e:
print(f"Error fetching menu items: {e}")
finally:
conn.close()
return render_template('index.html', menu_items=menu_items)
@app.route('/api/menu')
def api_menu():
"""API endpoint to get menu items"""
conn = get_db_connection()
menu_items = []
if conn:
try:
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM menu_items ORDER BY category, name")
menu_items = cursor.fetchall()
except mysql.connector.Error as e:
print(f"Error fetching menu items: {e}")
finally:
conn.close()
return jsonify(menu_items)
@app.route('/init')
def initialize():
"""Initialize database with menu data"""
create_menu_table()
insert_menu_data()
return "Database initialized with sample menu data"
if __name__ == '__main__':
# Initialize database on startup
create_menu_table()
# Check if menu items exist, if not, insert sample data
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM menu_items")
count = cursor.fetchone()[0]
if count == 0:
insert_menu_data()
except mysql.connector.Error as e:
print(f"Error checking menu items: {e}")
finally:
conn.close()
app.run(debug=True, port=5000)

250
employee_votes_api.py Normal file
View File

@@ -0,0 +1,250 @@
from flask import Flask, request, jsonify
from flask_cors import CORS
import mysql.connector
app = Flask(__name__)
CORS(app, resources={r"/v1/*": {"origins": "http://localhost:3000"}})
# Database connection details from DB_connection.txt
DB_CONFIG = {
'host': 'mysql.theaken.com',
'port': 33306,
'database': 'db_A019',
'user': 'A019',
'password': '9wvKEkxBzVca'
}
def get_db_connection():
try:
conn = mysql.connector.connect(**DB_CONFIG)
return conn
except mysql.connector.Error as err:
print(f"Error connecting to database: {err}")
return None
def format_response(status, code, message, data=None):
return jsonify({
"status": status,
"code": code,
"message": message,
"data": data
}), code
def format_error(code, message):
return jsonify({
"status": "error",
"code": code,
"message": message
}), code
@app.route('/v1/employee_votes', methods=['GET'])
def get_employee_votes():
conn = get_db_connection()
if conn is None:
return format_error(500, "Database connection failed")
cursor = conn.cursor(dictionary=True)
query = "SELECT * FROM employee_votes WHERE 1=1"
params = []
if 'menu_item_id' in request.args:
query += " AND menu_item_id = %s"
params.append(request.args['menu_item_id'])
if 'emp_id' in request.args:
query += " AND emp_id = %s"
params.append(request.args['emp_id'])
if 'emp_name' in request.args:
query += " AND emp_name = %s"
params.append(request.args['emp_name'])
if 'Order_Date' in request.args:
query += " AND Order_Date = %s"
params.append(request.args['Order_Date'])
if 'is_active' in request.args:
query += " AND is_active = %s"
params.append(request.args['is_active'])
try:
cursor.execute(query, tuple(params))
employee_votes = cursor.fetchall()
meta = {"count": len(employee_votes)}
return format_response("success", 200, "Employee votes retrieved successfully", {"employee_votes": employee_votes, "meta": meta})
except mysql.connector.Error as err:
return format_error(500, f"Error retrieving employee votes: {err}")
finally:
cursor.close()
conn.close()
@app.route('/v1/employee_votes/<int:id>', methods=['GET'])
def get_employee_vote_by_id(id):
conn = get_db_connection()
if conn is None:
return format_error(500, "Database connection failed")
cursor = conn.cursor(dictionary=True)
query = "SELECT * FROM employee_votes WHERE id = %s"
try:
cursor.execute(query, (id,))
employee_vote = cursor.fetchone()
if employee_vote:
return format_response("success", 200, "Employee vote retrieved successfully", employee_vote)
else:
return format_error(404, "Employee vote not found")
except mysql.connector.Error as err:
return format_error(500, f"Error retrieving employee vote: {err}")
finally:
cursor.close()
conn.close()
@app.route('/v1/employee_votes', methods=['POST'])
def create_employee_vote():
conn = get_db_connection()
if conn is None:
return format_error(500, "Database connection failed")
cursor = conn.cursor(dictionary=True)
data = request.get_json()
if not data:
return format_error(400, "Invalid JSON data")
required_fields = ['menu_item_id', 'emp_id', 'emp_name', 'Order_Date']
if not all(field in data for field in required_fields):
return format_error(400, "Missing required fields")
menu_item_id = data['menu_item_id']
emp_id = data['emp_id']
emp_name = data['emp_name']
order_date = data['Order_Date']
insert_query = "INSERT INTO employee_votes (menu_item_id, emp_id, emp_name, Order_Date) VALUES (%s, %s, %s, %s)"
try:
cursor.execute(insert_query, (menu_item_id, emp_id, emp_name, order_date))
conn.commit()
new_id = cursor.lastrowid
new_vote = {"id": new_id, "menu_item_id": menu_item_id, "emp_id": emp_id, "emp_name": emp_name, "Order_Date": order_date, "is_active": 1}
return format_response("success", 201, "Employee vote created successfully", new_vote)
except mysql.connector.Error as err:
conn.rollback()
return format_error(500, f"Error creating employee vote: {err}")
finally:
cursor.close()
conn.close()
@app.route('/v1/employee_votes/<int:id>', methods=['PATCH'])
def update_employee_vote(id):
conn = get_db_connection()
if conn is None:
return format_error(500, "Database connection failed")
cursor = conn.cursor(dictionary=True)
data = request.get_json()
if not data:
return format_error(400, "Invalid JSON data")
update_fields = []
params = []
if 'menu_item_id' in data:
update_fields.append("menu_item_id = %s")
params.append(data['menu_item_id'])
if 'is_active' in data:
update_fields.append("is_active = %s")
params.append(data['is_active'])
if not update_fields:
return format_error(400, "No fields to update")
update_query = "UPDATE employee_votes SET " + ", ".join(update_fields) + " WHERE id = %s"
params.append(id)
try:
cursor.execute(update_query, tuple(params))
conn.commit()
if cursor.rowcount == 0:
return format_error(404, "Employee vote not found")
# Retrieve updated data
cursor.execute("SELECT * FROM employee_votes WHERE id = %s", (id,))
updated_vote = cursor.fetchone()
return format_response("success", 200, "Employee vote updated successfully", updated_vote)
except mysql.connector.Error as err:
conn.rollback()
return format_error(500, f"Error updating employee vote: {err}")
finally:
cursor.close()
conn.close()
@app.route('/v1/employee_votes/by_emp_date', methods=['PATCH'])
def update_employee_vote_by_emp_date():
conn = get_db_connection()
if conn is None:
return format_error(500, "Database connection failed")
cursor = conn.cursor(dictionary=True)
data = request.get_json()
if not data:
return format_error(400, "Invalid JSON data")
required_fields = ['emp_id', 'Order_Date']
if not all(field in data for field in required_fields):
return format_error(400, "Missing required fields: emp_id and Order_Date")
emp_id = data['emp_id']
order_date = data['Order_Date']
update_fields = []
params = []
if 'menu_item_id' in data:
update_fields.append("menu_item_id = %s")
params.append(data['menu_item_id'])
if 'is_active' in data:
update_fields.append("is_active = %s")
params.append(data['is_active'])
if not update_fields:
return format_error(400, "No fields to update")
update_query = "UPDATE employee_votes SET " + ", ".join(update_fields) + " WHERE emp_id = %s AND Order_Date = %s"
params.append(emp_id)
params.append(order_date)
try:
cursor.execute(update_query, tuple(params))
conn.commit()
if cursor.rowcount == 0:
return format_error(404, "Employee vote not found for the given emp_id and Order_Date")
# Retrieve updated data
cursor.execute("SELECT * FROM employee_votes WHERE emp_id = %s AND Order_Date = %s", (emp_id, order_date))
updated_votes = cursor.fetchall()
return format_response("success", 200, "Employee vote updated successfully", updated_votes)
except mysql.connector.Error as err:
conn.rollback()
return format_error(500, f"Error updating employee vote: {err}")
finally:
cursor.close()
conn.close()
@app.route('/v1/employee_votes/<int:id>', methods=['DELETE'])
def delete_employee_vote(id):
conn = get_db_connection()
if conn is None:
return format_error(500, "Database connection failed")
cursor = conn.cursor(dictionary=True)
delete_query = "DELETE FROM employee_votes WHERE id = %s"
try:
cursor.execute(delete_query, (id,))
conn.commit()
if cursor.rowcount == 0:
return format_error(404, "Employee vote not found")
return format_response("success", 204, "Employee vote deleted successfully")
except mysql.connector.Error as err:
conn.rollback()
return format_error(500, f"Error deleting employee vote: {err}")
finally:
cursor.close()
conn.close()
if __name__ == '__main__':
app.run(debug=True, port=5000)

300
temp_app.py Normal file
View File

@@ -0,0 +1,300 @@
from flask import Flask, render_template, request, jsonify
import mysql.connector
import requests
from bs4 import BeautifulSoup
import json
from datetime import datetime
import re
import urllib3
# 禁用SSL警<4C>?
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
app = Flask(__name__)
# Database configuration from DB_connection.txt
DB_CONFIG = {
'host': 'mysql.theaken.com',
'port': 33306,
'database': 'db_A019',
'user': 'A019',
'password': '9wvKEkxBzVca'
}
def get_db_connection():
"""Establish database connection"""
try:
conn = mysql.connector.connect(**DB_CONFIG)
return conn
except mysql.connector.Error as e:
print(f"Database connection error: {e}")
return None
def create_menu_table():
"""Create menu_items table if not exists"""
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS menu_items (
Id INTEGER AUTO_INCREMENT PRIMARY KEY,
main_course VARCHAR(50) NOT NULL,
side_dish VARCHAR(50),
addon VARCHAR(50),
is_active BOOLEAN NOT NULL
)
""")
conn.commit()
print("Menu table created or already exists")
except mysql.connector.Error as e:
print(f"Error creating table: {e}")
finally:
conn.close()
def scrape_menu_data():
"""Scrape menu data from the target URL"""
url = "https://club.panjit.com.tw/back/menu/menu.php?1672970133&indexselectid=1"
try:
# Bypass SSL verification for the target website
response = requests.get(url, timeout=10, verify=False)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
# ?<3F><>?實<>?網<>?結<>?<3F><>???<3F>單資<E596AE>?
menu_items = []
# ?<3F>找?<3F>單表格?<3F><>?<3F><>? menu_tables = soup.find_all('table')
menu_lists = soup.find_all(['ul', 'ol'])
# 如<>??<3F>到表格結<E6A0BC>?
if menu_tables:
for table in menu_tables:
rows = table.find_all('tr')
for row in rows:
cells = row.find_all('td')
if len(cells) >= 2: # ?<3F><>??<3F><>?稱<>??<3F>
name = cells[0].get_text(strip=True)
price_text = cells[1].get_text(strip=True)
# ?<3F><>??<3F>格?<3F><>?
price = 0.0
try:
# 從<>?字中?<3F><>??<3F><>?
price_match = re.search(r'\d+', price_text)
if price_match:
price = float(price_match.group())
except:
pass
if name and price > 0:
menu_items.append({
'name': name,
'category': '<EFBFBD>?',
'price': price,
'description': f'{name} - 美味餐<E591B3>?',
'image_url': f'/static/images/{name.replace(" ", "_").lower()}.jpg'
})
# 如<>??<3F>到?<3F>表結<E8A1A8>?
elif menu_lists:
for menu_list in menu_lists:
items = menu_list.find_all('li')
for item in items:
text = item.get_text(strip=True)
if text and any(char.isdigit() for char in text):
# ?<3F><EFBFBD><E8A9A6>???<3F>目?<3F>稱?<3F>價?? # 尋找?<3F>格模<E6A0BC>?
price_match = re.search(r'(\$|NT\$|\$)?\s*(\d+)', text)
if price_match:
price = float(price_match.group(2))
name = re.sub(r'(\$|NT\$|\$)?\s*\d+', '', text).strip()
if name and price > 0:
menu_items.append({
'name': name,
'category': '<EFBFBD>?',
'price': price,
'description': f'{name} - 精選美<E981B8>?',
'image_url': f'/static/images/{name.replace(" ", "_").lower()}.jpg'
})
# 如<>?以<>??<3F><>??<3F><>??<3F>使?<3F><>??<3F>方案<E696B9>??<3F><>??<3F>含?<3F>單?<3F><>??<3F>div?<3F>span
if not menu_items:
# 尋找?<3F>能?<3F>含?<3F>單?<3F>目?<3F><>?<3F><>? possible_menu_elements = soup.find_all(['div', 'span', 'p'])
for elem in possible_menu_elements:
text = elem.get_text(strip=True)
if text and len(text) > 3 and any(char.isdigit() for char in text):
price_match = re.search(r'(\$|NT\$|\$)?\s*(\d+)', text)
if price_match:
price = float(price_match.group(2))
name = re.sub(r'(\$|NT\$|\$)?\s*\d+.*', '', text).strip()
if name and price > 0 and len(name) > 1:
menu_items.append({
'name': name,
'category': '<EFBFBD>?',
'price': price,
'description': f'{name} - 餐廳?<3F>',
'image_url': f'/static/images/{name.replace(" ", "_").lower()}.jpg'
})
# 如<>?仍然沒<E784B6>??<3F>到?<3F>單?<3F>使?<3F><>?例<>??? if not menu_items:
menu_items = [
{
'name': '經典?<3F><>?<3F><>?,
'category': '<EFBFBD>?',
'price': 120.00,
'description': '?<3F>統?<3F>灣?<3F><>?麵<>?湯頭濃<E9A0AD>?',
'image_url': '/static/images/beef_noodle.jpg'
},
{
'name': '?<3F><EFBFBD><E885BF>?,
'category': '<EFBFBD>?',
'price': 95.00,
'description': '<EFBFBD>??<3F>腿?<3F><>???,
'image_url': '/static/images/chicken_rice.jpg'
},
{
'name': '?<3F><>?沙<>?',
'category': '?<3F><>?',
'price': 65.00,
'description': '?<3F>鮮?<3F>蔬?<3F><>??<3F>製?<3F><>?',
'image_url': '/static/images/salad.jpg'
},
{
'name': '<EFBFBD>??<3F><>???,
'category': '<EFBFBD>?',
'price': 150.00,
'description': '?<3F>統江<E7B5B1>??<3F><>??<3F>質鮮嫩',
'image_url': '/static/images/lion_head.jpg'
},
{
'name': '?<3F><>???,
'category': '?<3F><>?',
'price': 60.00,
'description': '?<3F>令?<3F><>?清<>?',
'image_url': '/static/images/vegetable.jpg'
}
]
return menu_items
except requests.RequestException as e:
print(f"Error scraping menu data: {e}")
# 返<>?範<>?資<>?作為?<3F>
return [
{
'name': '經典?<3F><>?<3F><>?,
'category': '<EFBFBD>?',
'price': 120.00,
'description': '?<3F>統?<3F>灣?<3F><>?麵<>?湯頭濃<E9A0AD>?',
'image_url': '/static/images/beef_noodle.jpg'
},
{
'name': '?<3F><EFBFBD><E885BF>?,
'category': '<EFBFBD>?',
'price': 95.00,
'description': '<EFBFBD>??<3F>腿?<3F><>???,
'image_url': '/static/images/chicken_rice.jpg'
}
]
def insert_menu_data():
"""Insert sample menu items into database"""
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor()
# Clear existing data
cursor.execute("DELETE FROM menu_items")
# Insert sample data that matches the new table structure
sample_menu_items = [
('經典?<3F><>?<3F><>?, '?<EFBFBD>工麵<EFBFBD>?', '?<EFBFBD><EFBFBD>?<EFBFBD><EFBFBD>?, True),
('<EFBFBD>??<3F><EFBFBD><E885BF>?, '?<EFBFBD>?<EFBFBD><EFBFBD>?', '?<EFBFBD>米濃湯', True),
('<EFBFBD>??<3F><>???, '?<EFBFBD>', '<EFBFBD>??<EFBFBD><EFBFBD><EFBFBD>?, True),
('?<3F><>??<3F>', None, '?<3F><>?<3F><>?, True),
('?<3F>鮭魚<E9AEAD>?', '馬鈴?<3F>', '?<3F><>?<3F><>?, True)
]
insert_query = """
INSERT INTO menu_items (main_course, side_dish, addon, is_active)
VALUES (%s, %s, %s, %s)
"""
for item in sample_menu_items:
cursor.execute(insert_query, item)
conn.commit()
print(f"Inserted {len(sample_menu_items)} menu items")
except mysql.connector.Error as e:
print(f"Error inserting menu data: {e}")
finally:
conn.close()
@app.route('/')
def index():
"""Main page - display menu items"""
conn = get_db_connection()
menu_items = []
if conn:
try:
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM menu_items ORDER BY category, name")
menu_items = cursor.fetchall()
except mysql.connector.Error as e:
print(f"Error fetching menu items: {e}")
finally:
conn.close()
return render_template('index.html', menu_items=menu_items)
@app.route('/api/menu')
def api_menu():
"""API endpoint to get menu items"""
conn = get_db_connection()
menu_items = []
if conn:
try:
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM menu_items ORDER BY category, name")
menu_items = cursor.fetchall()
except mysql.connector.Error as e:
print(f"Error fetching menu items: {e}")
finally:
conn.close()
return jsonify(menu_items)
@app.route('/init')
def initialize():
"""Initialize database with menu data"""
create_menu_table()
insert_menu_data()
return "Database initialized with sample menu data"
if __name__ == '__main__':
# Initialize database on startup
create_menu_table()
# Check if menu items exist, if not, insert sample data
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM menu_items")
count = cursor.fetchone()[0]
if count == 0:
insert_menu_data()
except mysql.connector.Error as e:
print(f"Error checking menu items: {e}")
finally:
conn.close()
app.run(debug=True, port=5000)