445 lines
16 KiB
Python
445 lines
16 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""Integration tests for authentication routes and permission middleware."""
|
|
|
|
import json
|
|
import pytest
|
|
from unittest.mock import patch, MagicMock
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
import sys
|
|
import os
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src'))
|
|
|
|
import mes_dashboard.core.database as db
|
|
from mes_dashboard.app import create_app
|
|
from mes_dashboard.services import page_registry
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _ldap_defaults(monkeypatch):
|
|
monkeypatch.setattr("mes_dashboard.services.auth_service.LDAP_API_BASE", "https://ldap.panjit.example")
|
|
monkeypatch.setattr("mes_dashboard.services.auth_service.LDAP_CONFIG_ERROR", None)
|
|
|
|
|
|
@pytest.fixture
|
|
def temp_page_status(tmp_path):
|
|
"""Create temporary page status file."""
|
|
data_file = tmp_path / "page_status.json"
|
|
initial_data = {
|
|
"pages": [
|
|
{"route": "/", "name": "Portal", "status": "released"},
|
|
{"route": "/wip-overview", "name": "WIP Overview", "status": "released"},
|
|
{"route": "/dev-feature", "name": "Dev Feature", "status": "dev"},
|
|
],
|
|
"api_public": True
|
|
}
|
|
data_file.write_text(json.dumps(initial_data), encoding="utf-8")
|
|
return data_file
|
|
|
|
|
|
@pytest.fixture
|
|
def app(temp_page_status):
|
|
"""Create application for testing."""
|
|
db._ENGINE = None
|
|
|
|
# Mock page registry to use temp file
|
|
original_data_file = page_registry.DATA_FILE
|
|
original_cache = page_registry._cache
|
|
page_registry.DATA_FILE = temp_page_status
|
|
page_registry._cache = None
|
|
|
|
app = create_app('testing')
|
|
app.config['TESTING'] = True
|
|
app.config['WTF_CSRF_ENABLED'] = False
|
|
|
|
yield app
|
|
|
|
# Restore
|
|
page_registry.DATA_FILE = original_data_file
|
|
page_registry._cache = original_cache
|
|
|
|
|
|
@pytest.fixture
|
|
def client(app):
|
|
"""Create test client."""
|
|
return app.test_client()
|
|
|
|
|
|
class TestLoginRoute:
|
|
"""Tests for login route."""
|
|
|
|
def test_login_page_renders(self, client):
|
|
"""Test login page is accessible."""
|
|
response = client.get("/admin/login")
|
|
assert response.status_code == 200
|
|
assert "管理員登入" in response.data.decode("utf-8") or "login" in response.data.decode("utf-8").lower()
|
|
|
|
@patch('mes_dashboard.services.auth_service.LOCAL_AUTH_ENABLED', False)
|
|
@patch('mes_dashboard.routes.auth_routes.is_admin', return_value=True)
|
|
@patch('mes_dashboard.services.auth_service.requests.post')
|
|
def test_login_success(self, mock_post, _mock_is_admin, client):
|
|
"""Test successful login via LDAP."""
|
|
# Mock LDAP response
|
|
mock_response = MagicMock()
|
|
mock_response.json.return_value = {
|
|
"success": True,
|
|
"user": {
|
|
"username": "92367",
|
|
"displayName": "Admin User",
|
|
"mail": "ymirliu@panjit.com.tw",
|
|
"department": "Test Dept"
|
|
}
|
|
}
|
|
mock_post.return_value = mock_response
|
|
|
|
response = client.post("/admin/login", data={
|
|
"username": "92367",
|
|
"password": "password123"
|
|
}, follow_redirects=False)
|
|
|
|
# Should redirect after successful login
|
|
assert response.status_code == 302
|
|
|
|
# Check session contains admin
|
|
with client.session_transaction() as sess:
|
|
assert "admin" in sess
|
|
assert sess["admin"]["username"] == "92367"
|
|
|
|
@patch('mes_dashboard.services.auth_service.LOCAL_AUTH_ENABLED', False)
|
|
@patch('mes_dashboard.routes.auth_routes.is_admin', return_value=True)
|
|
@patch('mes_dashboard.services.auth_service.requests.post')
|
|
def test_login_blocks_external_next_redirect(self, mock_post, _mock_is_admin, client):
|
|
"""Should ignore external next URL and redirect to portal."""
|
|
mock_response = MagicMock()
|
|
mock_response.json.return_value = {
|
|
"success": True,
|
|
"user": {
|
|
"username": "92367",
|
|
"displayName": "Admin User",
|
|
"mail": "ymirliu@panjit.com.tw",
|
|
"department": "Test Dept",
|
|
},
|
|
}
|
|
mock_post.return_value = mock_response
|
|
|
|
response = client.post(
|
|
"/admin/login?next=https://evil.example/phish",
|
|
data={"username": "92367", "password": "password123"},
|
|
follow_redirects=False,
|
|
)
|
|
|
|
assert response.status_code == 302
|
|
assert "evil.example" not in response.location
|
|
assert response.location.endswith("/")
|
|
|
|
@patch('mes_dashboard.services.auth_service.LOCAL_AUTH_ENABLED', False)
|
|
@patch('mes_dashboard.routes.auth_routes.is_admin', return_value=True)
|
|
@patch('mes_dashboard.services.auth_service.requests.post')
|
|
def test_login_allows_internal_next_redirect(self, mock_post, _mock_is_admin, client):
|
|
"""Should keep validated local path in next URL."""
|
|
mock_response = MagicMock()
|
|
mock_response.json.return_value = {
|
|
"success": True,
|
|
"user": {
|
|
"username": "92367",
|
|
"displayName": "Admin User",
|
|
"mail": "ymirliu@panjit.com.tw",
|
|
"department": "Test Dept",
|
|
},
|
|
}
|
|
mock_post.return_value = mock_response
|
|
|
|
response = client.post(
|
|
"/admin/login?next=/admin/pages",
|
|
data={"username": "92367", "password": "password123"},
|
|
follow_redirects=False,
|
|
)
|
|
|
|
assert response.status_code == 302
|
|
assert response.location.endswith("/admin/pages")
|
|
|
|
@patch('mes_dashboard.services.auth_service.LOCAL_AUTH_ENABLED', False)
|
|
@patch('mes_dashboard.services.auth_service.requests.post')
|
|
def test_login_invalid_credentials(self, mock_post, client):
|
|
"""Test login with invalid credentials via LDAP."""
|
|
mock_response = MagicMock()
|
|
mock_response.json.return_value = {"success": False}
|
|
mock_post.return_value = mock_response
|
|
|
|
response = client.post("/admin/login", data={
|
|
"username": "wrong",
|
|
"password": "wrong"
|
|
})
|
|
|
|
assert response.status_code == 200
|
|
# Should show error message
|
|
assert "錯誤" in response.data.decode("utf-8") or "error" in response.data.decode("utf-8").lower()
|
|
|
|
@patch('mes_dashboard.services.auth_service.LOCAL_AUTH_ENABLED', False)
|
|
@patch('mes_dashboard.services.auth_service.requests.post')
|
|
def test_login_non_admin_user(self, mock_post, client):
|
|
"""Test login with non-admin user via LDAP."""
|
|
mock_response = MagicMock()
|
|
mock_response.json.return_value = {
|
|
"success": True,
|
|
"user": {
|
|
"username": "99999",
|
|
"displayName": "Regular User",
|
|
"mail": "regular@panjit.com.tw",
|
|
"department": "Test Dept"
|
|
}
|
|
}
|
|
mock_post.return_value = mock_response
|
|
|
|
response = client.post("/admin/login", data={
|
|
"username": "99999",
|
|
"password": "password123"
|
|
})
|
|
|
|
assert response.status_code == 200
|
|
# Should show non-admin error
|
|
content = response.data.decode("utf-8")
|
|
assert "管理員" in content or "admin" in content.lower()
|
|
|
|
def test_login_empty_credentials(self, client):
|
|
"""Test login with empty credentials."""
|
|
response = client.post("/admin/login", data={
|
|
"username": "",
|
|
"password": ""
|
|
})
|
|
|
|
assert response.status_code == 200
|
|
|
|
|
|
class TestLogoutRoute:
|
|
"""Tests for logout route."""
|
|
|
|
def test_logout(self, client):
|
|
"""Test logout clears session."""
|
|
# Login first
|
|
with client.session_transaction() as sess:
|
|
sess["admin"] = {"username": "admin"}
|
|
|
|
response = client.get("/admin/logout", follow_redirects=False)
|
|
|
|
assert response.status_code == 302
|
|
|
|
with client.session_transaction() as sess:
|
|
assert "admin" not in sess
|
|
|
|
|
|
class TestPermissionMiddleware:
|
|
"""Tests for permission middleware."""
|
|
|
|
def test_released_page_accessible_without_login(self, client):
|
|
"""Test released pages are accessible without login."""
|
|
response = client.get("/wip-overview")
|
|
# Should not be 403 (might be 200 or redirect)
|
|
assert response.status_code != 403
|
|
|
|
def test_dev_page_returns_403_without_login(self, client, temp_page_status):
|
|
"""Test dev pages return 403 for non-admin."""
|
|
# Add a dev route that exists in the app
|
|
# First update page status to have an existing route as dev
|
|
data = json.loads(temp_page_status.read_text())
|
|
data["pages"].append({"route": "/tables", "name": "Tables", "status": "dev"})
|
|
temp_page_status.write_text(json.dumps(data))
|
|
page_registry._cache = None
|
|
|
|
response = client.get("/tables")
|
|
assert response.status_code == 403
|
|
|
|
def test_dev_page_accessible_with_admin_login(self, client, temp_page_status):
|
|
"""Test dev pages are accessible for admin."""
|
|
# Update tables to dev
|
|
data = json.loads(temp_page_status.read_text())
|
|
data["pages"].append({"route": "/tables", "name": "Tables", "status": "dev"})
|
|
temp_page_status.write_text(json.dumps(data))
|
|
page_registry._cache = None
|
|
|
|
# Login as admin
|
|
with client.session_transaction() as sess:
|
|
sess["admin"] = {"username": "admin", "mail": "admin@test.com"}
|
|
|
|
response = client.get("/tables")
|
|
assert response.status_code != 403
|
|
|
|
def test_admin_pages_redirect_without_login(self, client):
|
|
"""Test admin pages redirect to login without authentication."""
|
|
response = client.get("/admin/pages", follow_redirects=False)
|
|
assert response.status_code == 302
|
|
assert "/admin/login" in response.location
|
|
|
|
def test_admin_pages_accessible_with_login(self, client):
|
|
"""Test admin pages are accessible with login."""
|
|
with client.session_transaction() as sess:
|
|
sess["admin"] = {"username": "admin", "mail": "admin@test.com"}
|
|
|
|
response = client.get("/admin/pages")
|
|
assert response.status_code == 200
|
|
|
|
|
|
class TestAdminAPI:
|
|
"""Tests for admin API endpoints."""
|
|
|
|
def test_get_pages_without_login(self, client):
|
|
"""Test get pages API requires login."""
|
|
response = client.get("/admin/api/pages")
|
|
# Should redirect
|
|
assert response.status_code == 302
|
|
|
|
def test_get_pages_with_login(self, client):
|
|
"""Test get pages API with login."""
|
|
with client.session_transaction() as sess:
|
|
sess["admin"] = {"username": "admin"}
|
|
|
|
response = client.get("/admin/api/pages")
|
|
assert response.status_code == 200
|
|
|
|
data = json.loads(response.data)
|
|
assert data["success"] is True
|
|
assert "pages" in data
|
|
|
|
def test_get_drawers_without_login(self, client):
|
|
"""Test drawer API requires login."""
|
|
response = client.get("/admin/api/drawers", follow_redirects=False)
|
|
assert response.status_code == 302
|
|
|
|
def test_mutate_drawers_without_login(self, client):
|
|
"""Test drawer mutations require login."""
|
|
response = client.post(
|
|
"/admin/api/drawers",
|
|
data=json.dumps({"name": "Unauthorized Drawer"}),
|
|
content_type="application/json",
|
|
follow_redirects=False,
|
|
)
|
|
assert response.status_code in (302, 401)
|
|
|
|
response = client.delete("/admin/api/drawers/reports", follow_redirects=False)
|
|
assert response.status_code == 302
|
|
|
|
def test_get_drawers_with_login(self, client):
|
|
"""Test list drawers API with login."""
|
|
with client.session_transaction() as sess:
|
|
sess["admin"] = {"username": "admin"}
|
|
|
|
response = client.get("/admin/api/drawers")
|
|
assert response.status_code == 200
|
|
data = json.loads(response.data)
|
|
assert data["success"] is True
|
|
assert "drawers" in data
|
|
assert any(drawer["id"] == "reports" for drawer in data["drawers"])
|
|
|
|
def test_create_drawer_duplicate_name_conflict(self, client):
|
|
"""Test creating duplicate drawer name returns 409."""
|
|
with client.session_transaction() as sess:
|
|
sess["admin"] = {"username": "admin"}
|
|
|
|
response = client.post(
|
|
"/admin/api/drawers",
|
|
data=json.dumps({"name": "報表類", "order": 99}),
|
|
content_type="application/json",
|
|
)
|
|
assert response.status_code == 409
|
|
|
|
def test_update_page_status(self, client, temp_page_status):
|
|
"""Test updating page status via API."""
|
|
with client.session_transaction() as sess:
|
|
sess["admin"] = {"username": "admin"}
|
|
|
|
response = client.put(
|
|
"/admin/api/pages/wip-overview",
|
|
data=json.dumps({"status": "dev"}),
|
|
content_type="application/json"
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = json.loads(response.data)
|
|
assert data["success"] is True
|
|
|
|
# Verify status changed
|
|
page_registry._cache = None
|
|
assert page_registry.get_page_status("/wip-overview") == "dev"
|
|
|
|
def test_update_page_invalid_status(self, client):
|
|
"""Test updating page with invalid status."""
|
|
with client.session_transaction() as sess:
|
|
sess["admin"] = {"username": "admin"}
|
|
|
|
response = client.put(
|
|
"/admin/api/pages/wip-overview",
|
|
data=json.dumps({"status": "invalid"}),
|
|
content_type="application/json"
|
|
)
|
|
|
|
assert response.status_code == 400
|
|
|
|
def test_update_page_drawer_assignment(self, client):
|
|
"""Test assigning page drawer via page update API."""
|
|
with client.session_transaction() as sess:
|
|
sess["admin"] = {"username": "admin"}
|
|
|
|
response = client.put(
|
|
"/admin/api/pages/wip-overview",
|
|
data=json.dumps({"drawer_id": "queries", "order": 3}),
|
|
content_type="application/json",
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
page_registry._cache = None
|
|
pages = page_registry.get_all_pages()
|
|
page = next(item for item in pages if item["route"] == "/wip-overview")
|
|
assert page["drawer_id"] == "queries"
|
|
assert page["order"] == 3
|
|
|
|
def test_update_page_invalid_drawer_assignment(self, client):
|
|
"""Test assigning a non-existent drawer returns bad request."""
|
|
with client.session_transaction() as sess:
|
|
sess["admin"] = {"username": "admin"}
|
|
|
|
response = client.put(
|
|
"/admin/api/pages/wip-overview",
|
|
data=json.dumps({"drawer_id": "missing-drawer"}),
|
|
content_type="application/json",
|
|
)
|
|
assert response.status_code == 400
|
|
|
|
def test_delete_drawer_with_assigned_pages_conflict(self, client):
|
|
"""Test deleting a non-empty drawer returns conflict."""
|
|
with client.session_transaction() as sess:
|
|
sess["admin"] = {"username": "admin"}
|
|
|
|
response = client.delete("/admin/api/drawers/reports")
|
|
assert response.status_code == 409
|
|
|
|
|
|
class TestContextProcessor:
|
|
"""Tests for SPA shell auth context surface."""
|
|
|
|
def test_is_admin_in_context_when_logged_in(self, client):
|
|
"""Test navigation API exposes admin context when logged in."""
|
|
with client.session_transaction() as sess:
|
|
sess["admin"] = {"username": "admin", "displayName": "Admin"}
|
|
|
|
response = client.get("/api/portal/navigation")
|
|
assert response.status_code == 200
|
|
payload = response.get_json()
|
|
assert payload["is_admin"] is True
|
|
assert payload["admin_user"]["username"] == "admin"
|
|
assert payload["admin_links"]["logout"] == "/admin/logout"
|
|
|
|
def test_is_admin_in_context_when_not_logged_in(self, client):
|
|
"""Test navigation API hides admin context when not logged in."""
|
|
response = client.get("/api/portal/navigation")
|
|
assert response.status_code == 200
|
|
payload = response.get_json()
|
|
assert payload["is_admin"] is False
|
|
assert payload["admin_user"] is None
|
|
assert payload["admin_links"]["logout"] is None
|
|
assert payload["admin_links"]["login"].startswith("/admin/login?next=")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
pytest.main([__file__, "-v"])
|