From f90a8a57b46d484c6bb35697c64ea8e959ff7825 Mon Sep 17 00:00:00 2001 From: egg Date: Wed, 11 Feb 2026 10:44:56 +0800 Subject: [PATCH] fix(security): add table_name whitelist to prevent SQL injection in table query APIs The /api/query_table and /api/get_table_columns endpoints accepted arbitrary table_name values that were interpolated directly into SQL f-strings. Since api_public is true, any unauthenticated user could exploit this. Now validates table_name and time_field against TABLES_CONFIG before reaching the database. Co-Authored-By: Claude Opus 4.6 --- src/mes_dashboard/app.py | 29 +++++++++++++++++++++++++---- tests/test_api_integration.py | 20 ++++++++++++++++---- 2 files changed, 41 insertions(+), 8 deletions(-) diff --git a/src/mes_dashboard/app.py b/src/mes_dashboard/app.py index 146b164..b09cbb7 100644 --- a/src/mes_dashboard/app.py +++ b/src/mes_dashboard/app.py @@ -498,6 +498,25 @@ def create_app(config_name: str | None = None) -> Flask: # Table Query APIs (for table_data_viewer) # ======================================================== + from mes_dashboard.config.tables import TABLES_CONFIG + + _ALLOWED_TABLES: dict[str, dict] = {} + for _tables in TABLES_CONFIG.values(): + for _tbl in _tables: + _ALLOWED_TABLES[_tbl['name']] = _tbl + + def _validate_table_request(table_name, time_field=None): + """Validate table_name against TABLES_CONFIG whitelist.""" + if not table_name: + return '請指定表名', 400 + if table_name not in _ALLOWED_TABLES: + return '不允許查詢此表', 400 + if time_field is not None: + allowed_time = _ALLOWED_TABLES[table_name].get('time_field') + if time_field != allowed_time: + return '不允許的時間欄位', 400 + return None, None + @app.route('/api/query_table', methods=['POST']) def query_table(): """API: query table data with optional column filters.""" @@ -507,8 +526,9 @@ def create_app(config_name: str | None = None) -> Flask: time_field = data.get('time_field') filters = data.get('filters') - if not table_name: - return jsonify({'error': '請指定表名'}), 400 + error, status = _validate_table_request(table_name, time_field) + if error: + return jsonify({'error': error}), status result = get_table_data(table_name, limit, time_field, filters) return jsonify(result) @@ -519,8 +539,9 @@ def create_app(config_name: str | None = None) -> Flask: data = request.get_json() table_name = data.get('table_name') - if not table_name: - return jsonify({'error': '請指定表名'}), 400 + error, status = _validate_table_request(table_name) + if error: + return jsonify({'error': error}), status columns = get_table_columns(table_name) return jsonify({'columns': columns}) diff --git a/tests/test_api_integration.py b/tests/test_api_integration.py index d148168..63730d0 100644 --- a/tests/test_api_integration.py +++ b/tests/test_api_integration.py @@ -30,7 +30,7 @@ class TestTableQueryAPIIntegration(unittest.TestCase): response = self.client.post( '/api/get_table_columns', - json={'table_name': 'TEST_TABLE'}, + json={'table_name': 'DWH.DW_MES_LOT_V'}, content_type='application/json' ) @@ -51,6 +51,18 @@ class TestTableQueryAPIIntegration(unittest.TestCase): data = json.loads(response.data) self.assertIn('error', data) + def test_query_table_rejects_unlisted_table(self): + """Query table with table_name not in TABLES_CONFIG should return 400.""" + response = self.client.post( + '/api/query_table', + json={'table_name': 'EVIL_TABLE; DROP TABLE --'}, + content_type='application/json' + ) + + self.assertEqual(response.status_code, 400) + data = json.loads(response.data) + self.assertIn('error', data) + @patch('mes_dashboard.app.get_table_data') def test_query_table_success(self, mock_get_data): """Query table should return JSON with data array.""" @@ -61,7 +73,7 @@ class TestTableQueryAPIIntegration(unittest.TestCase): response = self.client.post( '/api/query_table', - json={'table_name': 'TEST_TABLE', 'limit': 100}, + json={'table_name': 'DWH.DW_MES_LOT_V', 'limit': 100}, content_type='application/json' ) @@ -93,7 +105,7 @@ class TestTableQueryAPIIntegration(unittest.TestCase): response = self.client.post( '/api/query_table', json={ - 'table_name': 'TEST_TABLE', + 'table_name': 'DWH.DW_MES_LOT_V', 'limit': 100, 'filters': {'STATUS': 'ACTIVE'} }, @@ -277,7 +289,7 @@ class TestAPIContentType(unittest.TestCase): response = self.client.post( '/api/get_table_columns', - json={'table_name': 'TEST'}, + json={'table_name': 'DWH.DW_MES_LOT_V'}, content_type='application/json' )