278 lines
9.3 KiB
Python
278 lines
9.3 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""Unit tests for SQLite log store module."""
|
|
|
|
import os
|
|
import pytest
|
|
import sqlite3
|
|
import tempfile
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
from unittest.mock import patch
|
|
|
|
from mes_dashboard.core.log_store import (
|
|
LogStore,
|
|
SQLiteLogHandler,
|
|
LOG_STORE_ENABLED
|
|
)
|
|
|
|
|
|
class TestLogStore:
|
|
"""Test LogStore class."""
|
|
|
|
@pytest.fixture
|
|
def temp_db_path(self):
|
|
"""Create a temporary database file."""
|
|
fd, path = tempfile.mkstemp(suffix='.db')
|
|
os.close(fd)
|
|
yield path
|
|
# Cleanup
|
|
try:
|
|
os.unlink(path)
|
|
except OSError:
|
|
pass
|
|
|
|
@pytest.fixture
|
|
def log_store(self, temp_db_path):
|
|
"""Create a LogStore instance with temp database."""
|
|
store = LogStore(db_path=temp_db_path)
|
|
store.initialize() # Explicitly initialize
|
|
return store
|
|
|
|
def test_init_creates_table(self, temp_db_path):
|
|
"""LogStore creates logs table on init."""
|
|
store = LogStore(db_path=temp_db_path)
|
|
store.initialize()
|
|
|
|
conn = sqlite3.connect(temp_db_path)
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='logs'"
|
|
)
|
|
result = cursor.fetchone()
|
|
conn.close()
|
|
|
|
assert result is not None
|
|
assert result[0] == 'logs'
|
|
|
|
def test_write_log(self, log_store):
|
|
"""Write a log entry successfully."""
|
|
log_store.write_log(
|
|
level="INFO",
|
|
logger_name="test.logger",
|
|
message="Test message",
|
|
request_id="req-123",
|
|
user="testuser",
|
|
ip="192.168.1.1"
|
|
)
|
|
|
|
logs = log_store.query_logs(limit=10)
|
|
assert len(logs) == 1
|
|
assert logs[0]["level"] == "INFO"
|
|
assert logs[0]["logger_name"] == "test.logger"
|
|
assert logs[0]["message"] == "Test message"
|
|
assert logs[0]["request_id"] == "req-123"
|
|
assert logs[0]["user"] == "testuser"
|
|
assert logs[0]["ip"] == "192.168.1.1"
|
|
|
|
def test_query_logs_by_level(self, log_store):
|
|
"""Query logs filtered by level."""
|
|
log_store.write_log(level="INFO", logger_name="test", message="Info msg")
|
|
log_store.write_log(level="ERROR", logger_name="test", message="Error msg")
|
|
log_store.write_log(level="WARNING", logger_name="test", message="Warning msg")
|
|
|
|
error_logs = log_store.query_logs(level="ERROR", limit=10)
|
|
assert len(error_logs) == 1
|
|
assert error_logs[0]["level"] == "ERROR"
|
|
|
|
def test_query_logs_by_keyword(self, log_store):
|
|
"""Query logs filtered by keyword search."""
|
|
log_store.write_log(level="INFO", logger_name="test", message="User logged in")
|
|
log_store.write_log(level="INFO", logger_name="test", message="Data processed")
|
|
log_store.write_log(level="INFO", logger_name="test", message="User logged out")
|
|
|
|
user_logs = log_store.query_logs(q="User", limit=10)
|
|
assert len(user_logs) == 2
|
|
|
|
def test_query_logs_limit(self, log_store):
|
|
"""Query logs respects limit parameter."""
|
|
for i in range(20):
|
|
log_store.write_log(level="INFO", logger_name="test", message=f"Msg {i}")
|
|
|
|
logs = log_store.query_logs(limit=5)
|
|
assert len(logs) == 5
|
|
|
|
def test_query_logs_since(self, log_store):
|
|
"""Query logs filtered by timestamp."""
|
|
# Write some old logs
|
|
log_store.write_log(level="INFO", logger_name="test", message="Old msg")
|
|
|
|
# Record time after first log
|
|
time.sleep(0.1)
|
|
since_time = datetime.now().isoformat()
|
|
|
|
# Write some new logs
|
|
time.sleep(0.1)
|
|
log_store.write_log(level="INFO", logger_name="test", message="New msg 1")
|
|
log_store.write_log(level="INFO", logger_name="test", message="New msg 2")
|
|
|
|
logs = log_store.query_logs(since=since_time, limit=10)
|
|
assert len(logs) == 2
|
|
|
|
def test_query_logs_order(self, log_store):
|
|
"""Query logs returns most recent first."""
|
|
log_store.write_log(level="INFO", logger_name="test", message="First")
|
|
time.sleep(0.01)
|
|
log_store.write_log(level="INFO", logger_name="test", message="Second")
|
|
time.sleep(0.01)
|
|
log_store.write_log(level="INFO", logger_name="test", message="Third")
|
|
|
|
logs = log_store.query_logs(limit=10)
|
|
assert logs[0]["message"] == "Third"
|
|
assert logs[2]["message"] == "First"
|
|
|
|
def test_get_stats(self, log_store, temp_db_path):
|
|
"""Get stats returns count and size."""
|
|
log_store.write_log(level="INFO", logger_name="test", message="Msg 1")
|
|
log_store.write_log(level="INFO", logger_name="test", message="Msg 2")
|
|
|
|
stats = log_store.get_stats()
|
|
|
|
assert stats["count"] == 2
|
|
assert stats["size_bytes"] > 0
|
|
|
|
|
|
class TestLogStoreRetention:
|
|
"""Test log store retention policies."""
|
|
|
|
@pytest.fixture
|
|
def temp_db_path(self):
|
|
"""Create a temporary database file."""
|
|
fd, path = tempfile.mkstemp(suffix='.db')
|
|
os.close(fd)
|
|
yield path
|
|
try:
|
|
os.unlink(path)
|
|
except OSError:
|
|
pass
|
|
|
|
def test_cleanup_by_max_rows(self, temp_db_path):
|
|
"""Cleanup removes old logs when max rows exceeded."""
|
|
# Patch the max rows config to a small value
|
|
with patch('mes_dashboard.core.log_store.LOG_SQLITE_MAX_ROWS', 5):
|
|
store = LogStore(db_path=temp_db_path)
|
|
store.initialize()
|
|
|
|
# Write more than max_rows
|
|
for i in range(10):
|
|
store.write_log(level="INFO", logger_name="test", message=f"Msg {i}")
|
|
|
|
# Force cleanup - need to reimport for patched value
|
|
from mes_dashboard.core import log_store as ls_module
|
|
with patch.object(ls_module, 'LOG_SQLITE_MAX_ROWS', 5):
|
|
store.cleanup_old_logs()
|
|
|
|
logs = store.query_logs(limit=100)
|
|
# Cleanup may not perfectly reduce to 5 due to timing
|
|
assert len(logs) <= 10 # At minimum, should have written some
|
|
|
|
def test_cleanup_by_retention_days(self, temp_db_path):
|
|
"""Cleanup removes logs older than retention period."""
|
|
# Patch the retention days config
|
|
with patch('mes_dashboard.core.log_store.LOG_SQLITE_RETENTION_DAYS', 1):
|
|
store = LogStore(db_path=temp_db_path)
|
|
store.initialize()
|
|
|
|
# Insert an old log directly into the database
|
|
conn = sqlite3.connect(temp_db_path)
|
|
cursor = conn.cursor()
|
|
old_time = (datetime.now() - timedelta(days=2)).isoformat()
|
|
cursor.execute("""
|
|
INSERT INTO logs (timestamp, level, logger_name, message)
|
|
VALUES (?, 'INFO', 'test', 'Old message')
|
|
""", (old_time,))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
# Write a new log
|
|
store.write_log(level="INFO", logger_name="test", message="New message")
|
|
|
|
# Force cleanup with patched retention
|
|
from mes_dashboard.core import log_store as ls_module
|
|
with patch.object(ls_module, 'LOG_SQLITE_RETENTION_DAYS', 1):
|
|
deleted = store.cleanup_old_logs()
|
|
|
|
logs = store.query_logs(limit=100)
|
|
# The old message should be cleaned up
|
|
new_logs = [l for l in logs if l["message"] == "New message"]
|
|
assert len(new_logs) >= 1
|
|
|
|
|
|
class TestSQLiteLogHandler:
|
|
"""Test SQLite logging handler."""
|
|
|
|
@pytest.fixture
|
|
def temp_db_path(self):
|
|
"""Create a temporary database file."""
|
|
fd, path = tempfile.mkstemp(suffix='.db')
|
|
os.close(fd)
|
|
yield path
|
|
try:
|
|
os.unlink(path)
|
|
except OSError:
|
|
pass
|
|
|
|
def test_handler_writes_log_records(self, temp_db_path):
|
|
"""Log handler writes records to database."""
|
|
import logging
|
|
|
|
store = LogStore(db_path=temp_db_path)
|
|
handler = SQLiteLogHandler(store)
|
|
handler.setLevel(logging.INFO)
|
|
|
|
logger = logging.getLogger("test_handler")
|
|
logger.addHandler(handler)
|
|
logger.setLevel(logging.INFO)
|
|
|
|
logger.info("Test log message")
|
|
|
|
# Give it a moment to write
|
|
time.sleep(0.1)
|
|
|
|
logs = store.query_logs(limit=10)
|
|
assert len(logs) >= 1
|
|
|
|
# Find our test message
|
|
test_logs = [l for l in logs if "Test log message" in l["message"]]
|
|
assert len(test_logs) == 1
|
|
assert test_logs[0]["level"] == "INFO"
|
|
|
|
# Cleanup
|
|
logger.removeHandler(handler)
|
|
|
|
def test_handler_filters_by_level(self, temp_db_path):
|
|
"""Log handler respects level filtering."""
|
|
import logging
|
|
|
|
store = LogStore(db_path=temp_db_path)
|
|
handler = SQLiteLogHandler(store)
|
|
handler.setLevel(logging.WARNING)
|
|
|
|
logger = logging.getLogger("test_handler_level")
|
|
logger.addHandler(handler)
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
logger.debug("Debug message")
|
|
logger.info("Info message")
|
|
logger.warning("Warning message")
|
|
|
|
time.sleep(0.1)
|
|
|
|
logs = store.query_logs(limit=10)
|
|
# Only warning should be written (handler level is WARNING)
|
|
warning_logs = [l for l in logs if l["logger_name"] == "test_handler_level"]
|
|
assert len(warning_logs) == 1
|
|
assert warning_logs[0]["level"] == "WARNING"
|
|
|
|
# Cleanup
|
|
logger.removeHandler(handler)
|