Files
0908_--USER--/app.py
2025-09-08 16:21:07 +08:00

286 lines
9.1 KiB
Python

import os
from flask import Flask, jsonify, request
from flask_cors import CORS
import mysql.connector
from mysql.connector import Error
# --- Database Configuration ---
# It's recommended to use environment variables for security
DB_HOST = os.getenv("DB_HOST", "mysql.theaken.com")
DB_PORT = int(os.getenv("DB_PORT", 33306))
DB_NAME = os.getenv("DB_NAME", "db_A027")
DB_USER = os.getenv("DB_USER", "A027")
DB_PASSWORD = os.getenv("DB_PASSWORD", "E1CelfxqlKoj")
DB_TABLE = "pizzas"
# --- Flask App Initialization ---
app = Flask(__name__)
CORS(app) # Enable CORS for all routes, allowing local frontend development
# --- Database Connection ---
def get_db_connection():
"""Establishes a connection to the MySQL database."""
try:
conn = mysql.connector.connect(
host=DB_HOST,
port=DB_PORT,
database=DB_NAME,
user=DB_USER,
password=DB_PASSWORD
)
return conn
except Error as e:
print(f"Error connecting to MySQL database: {e}")
return None
# --- Response Formatting Helpers ---
def create_success_response(data, message="Success", code=200, meta=None):
"""Creates a standardized success JSON response."""
response = {
"status": "success",
"code": code,
"message": message
}
if data is not None:
response["data"] = data
if meta is not None:
response["meta"] = meta
return jsonify(response), code
def create_error_response(message, code=400):
"""Creates a standardized error JSON response."""
return jsonify({
"status": "error",
"code": code,
"message": message
}), code
# --- API Routes ---
@app.route('/v1/pizzas', methods=['GET'])
def get_pizzas():
"""Retrieve a list of pizzas with optional filtering and pagination."""
conn = get_db_connection()
if not conn:
return create_error_response("Database connection failed", 500)
cursor = conn.cursor(dictionary=True)
# Pagination parameters
try:
page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 10))
offset = (page - 1) * limit
except ValueError:
return create_error_response("Invalid 'page' or 'limit' parameter. Must be integers.")
# Filtering parameters
min_age = request.args.get('min_age')
max_age = request.args.get('max_age')
query_params = []
where_clauses = []
if min_age:
try:
where_clauses.append("age >= %s")
query_params.append(int(min_age))
except ValueError:
return create_error_response("Invalid 'min_age' parameter. Must be an integer.")
if max_age:
try:
where_clauses.append("age <= %s")
query_params.append(int(max_age))
except ValueError:
return create_error_response("Invalid 'max_age' parameter. Must be an integer.")
base_query = f"FROM {DB_TABLE}"
if where_clauses:
base_query += " WHERE " + " AND ".join(where_clauses)
# Get total count for metadata
count_query = f"SELECT COUNT(*) as total {base_query}"
cursor.execute(count_query, tuple(query_params))
total_records = cursor.fetchone()['total']
# Get paginated data
data_query = f"SELECT id, name, email, age {base_query} ORDER BY id ASC LIMIT %s OFFSET %s"
cursor.execute(data_query, tuple(query_params + [limit, offset]))
pizzas = cursor.fetchall()
cursor.close()
conn.close()
meta = {
"total_records": total_records,
"current_page": page,
"page_size": limit,
"total_pages": (total_records + limit - 1) // limit
}
return create_success_response(pizzas, meta=meta)
@app.route('/v1/pizzas/<int:pizza_id>', methods=['GET'])
def get_pizza_by_id(pizza_id):
"""Retrieve a single pizza by its ID."""
conn = get_db_connection()
if not conn:
return create_error_response("Database connection failed", 500)
cursor = conn.cursor(dictionary=True)
cursor.execute(f"SELECT id, name, email, age FROM {DB_TABLE} WHERE id = %s", (pizza_id,))
pizza = cursor.fetchone()
cursor.close()
conn.close()
if pizza:
return create_success_response(pizza)
else:
return create_error_response("Pizza not found", 404)
@app.route('/v1/pizzas', methods=['POST'])
def create_pizza():
"""Create a new pizza."""
if not request.is_json:
return create_error_response("Invalid input: payload must be JSON")
data = request.get_json()
name = data.get('name')
email = data.get('email')
age = data.get('age')
if not all([name, email, age]):
return create_error_response("Missing required fields: 'name', 'email', 'age'")
if not isinstance(name, str) or not isinstance(email, str) or not isinstance(age, int):
return create_error_response("Invalid data type for fields.")
conn = get_db_connection()
if not conn:
return create_error_response("Database connection failed", 500)
cursor = conn.cursor(dictionary=True)
try:
query = f"INSERT INTO {DB_TABLE} (name, email, age) VALUES (%s, %s, %s)"
cursor.execute(query, (name, email, age))
new_pizza_id = cursor.lastrowid
conn.commit()
# Fetch the newly created pizza
cursor.execute(f"SELECT id, name, email, age FROM {DB_TABLE} WHERE id = %s", (new_pizza_id,))
new_pizza = cursor.fetchone()
return create_success_response(new_pizza, "Pizza created successfully", 201)
except Error as e:
conn.rollback()
# Check for duplicate entry
if e.errno == 1062:
return create_error_response(f"Failed to create pizza: email '{email}' already exists.", 409)
return create_error_response(f"Failed to create pizza: {e}", 500)
finally:
cursor.close()
conn.close()
@app.route('/v1/pizzas/<int:pizza_id>', methods=['PATCH'])
def update_pizza(pizza_id):
"""Update an existing pizza's information."""
if not request.is_json:
return create_error_response("Invalid input: payload must be JSON")
data = request.get_json()
if not data:
return create_error_response("No update fields provided.")
conn = get_db_connection()
if not conn:
return create_error_response("Database connection failed", 500)
cursor = conn.cursor(dictionary=True)
# Check if pizza exists
cursor.execute(f"SELECT id FROM {DB_TABLE} WHERE id = %s", (pizza_id,))
if not cursor.fetchone():
cursor.close()
conn.close()
return create_error_response("Pizza not found", 404)
update_fields = []
update_values = []
if 'name' in data:
update_fields.append("name = %s")
update_values.append(data['name'])
if 'email' in data:
update_fields.append("email = %s")
update_values.append(data['email'])
if 'age' in data:
update_fields.append("age = %s")
update_values.append(data['age'])
if not update_fields:
return create_error_response("No valid update fields provided.")
update_values.append(pizza_id)
try:
query = f"UPDATE {DB_TABLE} SET {', '.join(update_fields)} WHERE id = %s"
cursor.execute(query, tuple(update_values))
conn.commit()
# Fetch and return the updated pizza data
cursor.execute(f"SELECT id, name, email, age FROM {DB_TABLE} WHERE id = %s", (pizza_id,))
updated_pizza = cursor.fetchone()
return create_success_response(updated_pizza, "Pizza updated successfully")
except Error as e:
conn.rollback()
if e.errno == 1062:
return create_error_response(f"Failed to update pizza: email '{data['email']}' already exists.", 409)
return create_error_response(f"Failed to update pizza: {e}", 500)
finally:
cursor.close()
conn.close()
@app.route('/v1/pizzas/<int:pizza_id>', methods=['DELETE'])
def delete_pizza(pizza_id):
"""Delete a pizza by its ID."""
conn = get_db_connection()
if not conn:
return create_error_response("Database connection failed", 500)
cursor = conn.cursor()
# Check if pizza exists before deleting
cursor.execute(f"SELECT id FROM {DB_TABLE} WHERE id = %s", (pizza_id,))
if not cursor.fetchone():
cursor.close()
conn.close()
return create_error_response("Pizza not found", 404)
try:
cursor.execute(f"DELETE FROM {DB_TABLE} WHERE id = %s", (pizza_id,))
conn.commit()
# Check if the row was actually deleted
if cursor.rowcount == 0:
# This case is handled by the check above, but as a safeguard
return create_error_response("Pizza not found", 404)
return '', 204 # No Content
except Error as e:
conn.rollback()
return create_error_response(f"Failed to delete pizza: {e}", 500)
finally:
cursor.close()
conn.close()
# --- Main Execution ---
if __name__ == '__main__':
# It's recommended to use a production-ready WSGI server like Gunicorn or uWSGI
app.run(debug=True, port=5000)