import os from flask import Flask, jsonify, request from flask_cors import CORS import mysql.connector from mysql.connector import Error # --- Database Configuration --- # It's recommended to use environment variables for security DB_HOST = os.getenv("DB_HOST", "mysql.theaken.com") DB_PORT = int(os.getenv("DB_PORT", 33306)) DB_NAME = os.getenv("DB_NAME", "db_A027") DB_USER = os.getenv("DB_USER", "A027") DB_PASSWORD = os.getenv("DB_PASSWORD", "E1CelfxqlKoj") DB_TABLE = "pizzas" # --- Flask App Initialization --- app = Flask(__name__) CORS(app) # Enable CORS for all routes, allowing local frontend development # --- Database Connection --- def get_db_connection(): """Establishes a connection to the MySQL database.""" try: conn = mysql.connector.connect( host=DB_HOST, port=DB_PORT, database=DB_NAME, user=DB_USER, password=DB_PASSWORD ) return conn except Error as e: print(f"Error connecting to MySQL database: {e}") return None # --- Response Formatting Helpers --- def create_success_response(data, message="Success", code=200, meta=None): """Creates a standardized success JSON response.""" response = { "status": "success", "code": code, "message": message } if data is not None: response["data"] = data if meta is not None: response["meta"] = meta return jsonify(response), code def create_error_response(message, code=400): """Creates a standardized error JSON response.""" return jsonify({ "status": "error", "code": code, "message": message }), code # --- API Routes --- @app.route('/v1/pizzas', methods=['GET']) def get_pizzas(): """Retrieve a list of pizzas with optional filtering and pagination.""" conn = get_db_connection() if not conn: return create_error_response("Database connection failed", 500) cursor = conn.cursor(dictionary=True) # Pagination parameters try: page = int(request.args.get('page', 1)) limit = int(request.args.get('limit', 10)) offset = (page - 1) * limit except ValueError: return create_error_response("Invalid 'page' or 'limit' parameter. Must be integers.") # Filtering parameters min_age = request.args.get('min_age') max_age = request.args.get('max_age') query_params = [] where_clauses = [] if min_age: try: where_clauses.append("age >= %s") query_params.append(int(min_age)) except ValueError: return create_error_response("Invalid 'min_age' parameter. Must be an integer.") if max_age: try: where_clauses.append("age <= %s") query_params.append(int(max_age)) except ValueError: return create_error_response("Invalid 'max_age' parameter. Must be an integer.") base_query = f"FROM {DB_TABLE}" if where_clauses: base_query += " WHERE " + " AND ".join(where_clauses) # Get total count for metadata count_query = f"SELECT COUNT(*) as total {base_query}" cursor.execute(count_query, tuple(query_params)) total_records = cursor.fetchone()['total'] # Get paginated data data_query = f"SELECT id, name, email, age {base_query} ORDER BY id ASC LIMIT %s OFFSET %s" cursor.execute(data_query, tuple(query_params + [limit, offset])) pizzas = cursor.fetchall() cursor.close() conn.close() meta = { "total_records": total_records, "current_page": page, "page_size": limit, "total_pages": (total_records + limit - 1) // limit } return create_success_response(pizzas, meta=meta) @app.route('/v1/pizzas/', methods=['GET']) def get_pizza_by_id(pizza_id): """Retrieve a single pizza by its ID.""" conn = get_db_connection() if not conn: return create_error_response("Database connection failed", 500) cursor = conn.cursor(dictionary=True) cursor.execute(f"SELECT id, name, email, age FROM {DB_TABLE} WHERE id = %s", (pizza_id,)) pizza = cursor.fetchone() cursor.close() conn.close() if pizza: return create_success_response(pizza) else: return create_error_response("Pizza not found", 404) @app.route('/v1/pizzas', methods=['POST']) def create_pizza(): """Create a new pizza.""" if not request.is_json: return create_error_response("Invalid input: payload must be JSON") data = request.get_json() name = data.get('name') email = data.get('email') age = data.get('age') if not all([name, email, age]): return create_error_response("Missing required fields: 'name', 'email', 'age'") if not isinstance(name, str) or not isinstance(email, str) or not isinstance(age, int): return create_error_response("Invalid data type for fields.") conn = get_db_connection() if not conn: return create_error_response("Database connection failed", 500) cursor = conn.cursor(dictionary=True) try: query = f"INSERT INTO {DB_TABLE} (name, email, age) VALUES (%s, %s, %s)" cursor.execute(query, (name, email, age)) new_pizza_id = cursor.lastrowid conn.commit() # Fetch the newly created pizza cursor.execute(f"SELECT id, name, email, age FROM {DB_TABLE} WHERE id = %s", (new_pizza_id,)) new_pizza = cursor.fetchone() return create_success_response(new_pizza, "Pizza created successfully", 201) except Error as e: conn.rollback() # Check for duplicate entry if e.errno == 1062: return create_error_response(f"Failed to create pizza: email '{email}' already exists.", 409) return create_error_response(f"Failed to create pizza: {e}", 500) finally: cursor.close() conn.close() @app.route('/v1/pizzas/', methods=['PATCH']) def update_pizza(pizza_id): """Update an existing pizza's information.""" if not request.is_json: return create_error_response("Invalid input: payload must be JSON") data = request.get_json() if not data: return create_error_response("No update fields provided.") conn = get_db_connection() if not conn: return create_error_response("Database connection failed", 500) cursor = conn.cursor(dictionary=True) # Check if pizza exists cursor.execute(f"SELECT id FROM {DB_TABLE} WHERE id = %s", (pizza_id,)) if not cursor.fetchone(): cursor.close() conn.close() return create_error_response("Pizza not found", 404) update_fields = [] update_values = [] if 'name' in data: update_fields.append("name = %s") update_values.append(data['name']) if 'email' in data: update_fields.append("email = %s") update_values.append(data['email']) if 'age' in data: update_fields.append("age = %s") update_values.append(data['age']) if not update_fields: return create_error_response("No valid update fields provided.") update_values.append(pizza_id) try: query = f"UPDATE {DB_TABLE} SET {', '.join(update_fields)} WHERE id = %s" cursor.execute(query, tuple(update_values)) conn.commit() # Fetch and return the updated pizza data cursor.execute(f"SELECT id, name, email, age FROM {DB_TABLE} WHERE id = %s", (pizza_id,)) updated_pizza = cursor.fetchone() return create_success_response(updated_pizza, "Pizza updated successfully") except Error as e: conn.rollback() if e.errno == 1062: return create_error_response(f"Failed to update pizza: email '{data['email']}' already exists.", 409) return create_error_response(f"Failed to update pizza: {e}", 500) finally: cursor.close() conn.close() @app.route('/v1/pizzas/', methods=['DELETE']) def delete_pizza(pizza_id): """Delete a pizza by its ID.""" conn = get_db_connection() if not conn: return create_error_response("Database connection failed", 500) cursor = conn.cursor() # Check if pizza exists before deleting cursor.execute(f"SELECT id FROM {DB_TABLE} WHERE id = %s", (pizza_id,)) if not cursor.fetchone(): cursor.close() conn.close() return create_error_response("Pizza not found", 404) try: cursor.execute(f"DELETE FROM {DB_TABLE} WHERE id = %s", (pizza_id,)) conn.commit() # Check if the row was actually deleted if cursor.rowcount == 0: # This case is handled by the check above, but as a safeguard return create_error_response("Pizza not found", 404) return '', 204 # No Content except Error as e: conn.rollback() return create_error_response(f"Failed to delete pizza: {e}", 500) finally: cursor.close() conn.close() # --- Main Execution --- if __name__ == '__main__': # It's recommended to use a production-ready WSGI server like Gunicorn or uWSGI app.run(debug=True, port=5000)