fix(security): add table_name whitelist to prevent SQL injection in table query APIs

The /api/query_table and /api/get_table_columns endpoints accepted arbitrary
table_name values that were interpolated directly into SQL f-strings. Since
api_public is true, any unauthenticated user could exploit this. Now validates
table_name and time_field against TABLES_CONFIG before reaching the database.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
egg
2026-02-11 10:44:56 +08:00
parent dfaf0bc611
commit f90a8a57b4
2 changed files with 41 additions and 8 deletions

View File

@@ -498,6 +498,25 @@ def create_app(config_name: str | None = None) -> Flask:
# Table Query APIs (for table_data_viewer) # Table Query APIs (for table_data_viewer)
# ======================================================== # ========================================================
from mes_dashboard.config.tables import TABLES_CONFIG
_ALLOWED_TABLES: dict[str, dict] = {}
for _tables in TABLES_CONFIG.values():
for _tbl in _tables:
_ALLOWED_TABLES[_tbl['name']] = _tbl
def _validate_table_request(table_name, time_field=None):
"""Validate table_name against TABLES_CONFIG whitelist."""
if not table_name:
return '請指定表名', 400
if table_name not in _ALLOWED_TABLES:
return '不允許查詢此表', 400
if time_field is not None:
allowed_time = _ALLOWED_TABLES[table_name].get('time_field')
if time_field != allowed_time:
return '不允許的時間欄位', 400
return None, None
@app.route('/api/query_table', methods=['POST']) @app.route('/api/query_table', methods=['POST'])
def query_table(): def query_table():
"""API: query table data with optional column filters.""" """API: query table data with optional column filters."""
@@ -507,8 +526,9 @@ def create_app(config_name: str | None = None) -> Flask:
time_field = data.get('time_field') time_field = data.get('time_field')
filters = data.get('filters') filters = data.get('filters')
if not table_name: error, status = _validate_table_request(table_name, time_field)
return jsonify({'error': '請指定表名'}), 400 if error:
return jsonify({'error': error}), status
result = get_table_data(table_name, limit, time_field, filters) result = get_table_data(table_name, limit, time_field, filters)
return jsonify(result) return jsonify(result)
@@ -519,8 +539,9 @@ def create_app(config_name: str | None = None) -> Flask:
data = request.get_json() data = request.get_json()
table_name = data.get('table_name') table_name = data.get('table_name')
if not table_name: error, status = _validate_table_request(table_name)
return jsonify({'error': '請指定表名'}), 400 if error:
return jsonify({'error': error}), status
columns = get_table_columns(table_name) columns = get_table_columns(table_name)
return jsonify({'columns': columns}) return jsonify({'columns': columns})

View File

@@ -30,7 +30,7 @@ class TestTableQueryAPIIntegration(unittest.TestCase):
response = self.client.post( response = self.client.post(
'/api/get_table_columns', '/api/get_table_columns',
json={'table_name': 'TEST_TABLE'}, json={'table_name': 'DWH.DW_MES_LOT_V'},
content_type='application/json' content_type='application/json'
) )
@@ -51,6 +51,18 @@ class TestTableQueryAPIIntegration(unittest.TestCase):
data = json.loads(response.data) data = json.loads(response.data)
self.assertIn('error', data) self.assertIn('error', data)
def test_query_table_rejects_unlisted_table(self):
"""Query table with table_name not in TABLES_CONFIG should return 400."""
response = self.client.post(
'/api/query_table',
json={'table_name': 'EVIL_TABLE; DROP TABLE --'},
content_type='application/json'
)
self.assertEqual(response.status_code, 400)
data = json.loads(response.data)
self.assertIn('error', data)
@patch('mes_dashboard.app.get_table_data') @patch('mes_dashboard.app.get_table_data')
def test_query_table_success(self, mock_get_data): def test_query_table_success(self, mock_get_data):
"""Query table should return JSON with data array.""" """Query table should return JSON with data array."""
@@ -61,7 +73,7 @@ class TestTableQueryAPIIntegration(unittest.TestCase):
response = self.client.post( response = self.client.post(
'/api/query_table', '/api/query_table',
json={'table_name': 'TEST_TABLE', 'limit': 100}, json={'table_name': 'DWH.DW_MES_LOT_V', 'limit': 100},
content_type='application/json' content_type='application/json'
) )
@@ -93,7 +105,7 @@ class TestTableQueryAPIIntegration(unittest.TestCase):
response = self.client.post( response = self.client.post(
'/api/query_table', '/api/query_table',
json={ json={
'table_name': 'TEST_TABLE', 'table_name': 'DWH.DW_MES_LOT_V',
'limit': 100, 'limit': 100,
'filters': {'STATUS': 'ACTIVE'} 'filters': {'STATUS': 'ACTIVE'}
}, },
@@ -277,7 +289,7 @@ class TestAPIContentType(unittest.TestCase):
response = self.client.post( response = self.client.post(
'/api/get_table_columns', '/api/get_table_columns',
json={'table_name': 'TEST'}, json={'table_name': 'DWH.DW_MES_LOT_V'},
content_type='application/json' content_type='application/json'
) )