mcmqtt/tests/test_main.py
Ryan Malloy 8ab61eb1df 🚀 Initial release: mcmqtt FastMCP MQTT Server v2025.09.17
Complete FastMCP MQTT integration server featuring:

 Core Features:
- FastMCP native Model Context Protocol server with MQTT tools
- Embedded MQTT broker support with zero-configuration spawning
- Modular architecture: CLI, config, logging, server, MQTT, MCP, broker
- Comprehensive testing: 70+ tests with 96%+ coverage
- Cross-platform support: Linux, macOS, Windows

🏗️ Architecture:
- Clean separation of concerns across 7 modules
- Async/await patterns throughout for maximum performance
- Pydantic models with validation and configuration management
- AMQTT pure Python embedded brokers
- Typer CLI framework with rich output formatting

🧪 Quality Assurance:
- pytest-cov with HTML reporting
- AsyncMock comprehensive unit testing
- Edge case coverage for production reliability
- Pre-commit hooks with black, ruff, mypy

📦 Production Ready:
- PyPI package with proper metadata
- MIT License
- Professional documentation
- uvx installation support
- MCP client integration examples

Perfect for AI agent coordination, IoT data collection, and
microservice communication with MQTT messaging patterns.
2025-09-17 05:46:08 -06:00

394 lines
13 KiB
Python

"""Tests for the main CLI application."""
import os
import tempfile
from unittest.mock import patch, MagicMock
import pytest
import typer
from typer.testing import CliRunner
from mcmqtt.main import app, main, create_mqtt_config_from_env, get_version
class TestCLI:
"""Test cases for CLI functionality."""
def test_cli_app_creation(self):
"""Test CLI app is properly created."""
assert isinstance(app, typer.Typer)
assert app.info.name == "mcmqtt"
def test_version_command(self):
"""Test version command."""
runner = CliRunner()
result = runner.invoke(app, ["version"])
assert result.exit_code == 0
assert "mcmqtt version:" in result.stdout
def test_config_command(self):
"""Test config command."""
runner = CliRunner()
with patch.dict(os.environ, {
"MQTT_BROKER_HOST": "test.broker.com",
"MQTT_BROKER_PORT": "1883",
"MQTT_CLIENT_ID": "test-client"
}):
result = runner.invoke(app, ["config"])
assert result.exit_code == 0
assert "Configuration Sources:" in result.stdout
assert "test.broker.com" in result.stdout
def test_health_command_connection_error(self):
"""Test health command with connection error."""
runner = CliRunner()
# Test with non-existent server
result = runner.invoke(app, ["health", "--host", "nonexistent", "--port", "9999"])
assert result.exit_code == 1
assert "Cannot connect to server" in result.stdout
@patch('mcmqtt.main.uvicorn.Server')
@patch('mcmqtt.main.MCMQTTServer')
def test_serve_command_basic(self, mock_server_class, mock_uvicorn_server):
"""Test basic serve command."""
mock_server = MagicMock()
mock_server_class.return_value = mock_server
mock_uvicorn_instance = MagicMock()
mock_uvicorn_server.return_value = mock_uvicorn_instance
runner = CliRunner()
# Mock asyncio.run to avoid actually starting server
with patch('asyncio.run') as mock_run:
result = runner.invoke(app, [
"serve",
"--host", "localhost",
"--port", "3000"
])
assert result.exit_code == 0
mock_server_class.assert_called_once()
def test_serve_command_with_mqtt_config(self):
"""Test serve command with MQTT configuration."""
runner = CliRunner()
with patch('asyncio.run') as mock_run:
result = runner.invoke(app, [
"serve",
"--mqtt-host", "localhost",
"--mqtt-port", "1883",
"--mqtt-client-id", "test-client"
])
assert result.exit_code == 0
def test_serve_command_with_auto_connect(self):
"""Test serve command with auto-connect."""
runner = CliRunner()
with patch('asyncio.run') as mock_run:
result = runner.invoke(app, [
"serve",
"--mqtt-host", "localhost",
"--auto-connect"
])
assert result.exit_code == 0
class TestConfigurationFunctions:
"""Test configuration utility functions."""
def test_get_version_default(self):
"""Test version function with default fallback."""
# Mock importlib.metadata to raise exception
with patch('mcmqtt.main.version', side_effect=Exception("No version")):
version = get_version()
assert version == "0.1.0"
def test_create_mqtt_config_from_env_complete(self):
"""Test creating MQTT config from complete environment."""
env_vars = {
"MQTT_BROKER_HOST": "test.broker.com",
"MQTT_BROKER_PORT": "1883",
"MQTT_CLIENT_ID": "test-client",
"MQTT_USERNAME": "testuser",
"MQTT_PASSWORD": "testpass",
"MQTT_KEEPALIVE": "30",
"MQTT_QOS": "2",
"MQTT_USE_TLS": "true",
"MQTT_CLEAN_SESSION": "false",
"MQTT_RECONNECT_INTERVAL": "10",
"MQTT_MAX_RECONNECT_ATTEMPTS": "5"
}
with patch.dict(os.environ, env_vars):
config = create_mqtt_config_from_env()
assert config is not None
assert config.broker_host == "test.broker.com"
assert config.broker_port == 1883
assert config.client_id == "test-client"
assert config.username == "testuser"
assert config.password == "testpass"
assert config.keepalive == 30
assert config.qos.value == 2
assert config.use_tls is True
assert config.clean_session is False
assert config.reconnect_interval == 10
assert config.max_reconnect_attempts == 5
def test_create_mqtt_config_from_env_minimal(self):
"""Test creating MQTT config with minimal environment."""
env_vars = {
"MQTT_BROKER_HOST": "minimal.broker.com"
}
with patch.dict(os.environ, env_vars):
config = create_mqtt_config_from_env()
assert config is not None
assert config.broker_host == "minimal.broker.com"
assert config.broker_port == 1883 # Default
assert config.client_id.startswith("mcmqtt-") # Generated
assert config.username is None
assert config.password is None
def test_create_mqtt_config_from_env_missing_host(self):
"""Test creating MQTT config without required host."""
with patch.dict(os.environ, {}, clear=True):
config = create_mqtt_config_from_env()
assert config is None
def test_create_mqtt_config_from_env_invalid_values(self):
"""Test creating MQTT config with invalid values."""
env_vars = {
"MQTT_BROKER_HOST": "test.broker.com",
"MQTT_BROKER_PORT": "invalid_port",
"MQTT_QOS": "5" # Invalid QoS
}
with patch.dict(os.environ, env_vars):
config = create_mqtt_config_from_env()
# Should return None due to invalid values
assert config is None
class TestLogging:
"""Test logging configuration."""
def test_setup_logging_info_level(self):
"""Test logging setup with INFO level."""
from mcmqtt.main import setup_logging
setup_logging("INFO")
# Test that logger is configured
import logging
logger = logging.getLogger()
assert logger.level == logging.INFO
def test_setup_logging_debug_level(self):
"""Test logging setup with DEBUG level."""
from mcmqtt.main import setup_logging
setup_logging("DEBUG")
import logging
logger = logging.getLogger()
assert logger.level == logging.DEBUG
def test_setup_logging_invalid_level(self):
"""Test logging setup with invalid level."""
from mcmqtt.main import setup_logging
# Should not raise exception, will use default
setup_logging("INVALID")
class TestServerLifecycle:
"""Test server lifecycle management."""
@pytest.mark.asyncio
async def test_server_startup_with_config(self):
"""Test server startup with MQTT configuration."""
from mcmqtt.main import MCMQTTServer
from mcmqtt.mqtt.types import MQTTConfig
config = MQTTConfig(
broker_host="localhost",
broker_port=1883,
client_id="test-startup"
)
server = MCMQTTServer(config)
# Test initialization
success = await server.initialize_mqtt_client()
assert success
# Test cleanup
await server.disconnect_mqtt()
@pytest.mark.asyncio
async def test_server_startup_without_config(self):
"""Test server startup without MQTT configuration."""
from mcmqtt.main import MCMQTTServer
server = MCMQTTServer()
# Should work without MQTT config
assert server.mcp is not None
def test_main_function_exists(self):
"""Test that main function exists and is callable."""
assert callable(main)
class TestErrorHandling:
"""Test error handling in CLI."""
def test_serve_with_invalid_port(self):
"""Test serve command with invalid port."""
runner = CliRunner()
result = runner.invoke(app, [
"serve",
"--port", "99999" # Invalid port number
])
# Should handle gracefully (typer validates port range)
# Actual behavior depends on typer validation
def test_serve_keyboard_interrupt(self):
"""Test graceful shutdown on keyboard interrupt."""
runner = CliRunner()
def mock_run_with_interrupt():
raise KeyboardInterrupt()
with patch('asyncio.run', side_effect=mock_run_with_interrupt):
result = runner.invoke(app, ["serve"])
assert result.exit_code == 0
assert "stopped" in result.stdout.lower()
def test_health_command_invalid_response(self):
"""Test health command with invalid server response."""
runner = CliRunner()
# Mock httpx to return invalid response
with patch('httpx.get') as mock_get:
mock_response = MagicMock()
mock_response.status_code = 500
mock_get.return_value = mock_response
result = runner.invoke(app, ["health"])
assert result.exit_code == 1
assert "unhealthy" in result.stdout
class TestEnvironmentHandling:
"""Test environment variable handling."""
def test_environment_variable_display(self):
"""Test environment variable display in config command."""
runner = CliRunner()
test_env = {
"MQTT_BROKER_HOST": "env.broker.com",
"MQTT_CLIENT_ID": "env-client",
"LOG_LEVEL": "DEBUG"
}
with patch.dict(os.environ, test_env):
result = runner.invoke(app, ["config"])
assert result.exit_code == 0
assert "env.broker.com" in result.stdout
assert "env-client" in result.stdout
assert "DEBUG" in result.stdout
def test_password_masking_in_config(self):
"""Test that passwords are masked in config display."""
runner = CliRunner()
test_env = {
"MQTT_BROKER_HOST": "test.broker.com",
"MQTT_PASSWORD": "secret123"
}
with patch.dict(os.environ, test_env):
result = runner.invoke(app, ["config"])
assert result.exit_code == 0
assert "secret123" not in result.stdout
assert "***" in result.stdout
class TestSignalHandling:
"""Test signal handling and graceful shutdown."""
@patch('mcmqtt.main.MCMQTTServer')
def test_graceful_shutdown_on_exception(self, mock_server_class):
"""Test graceful shutdown when server raises exception."""
mock_server = MagicMock()
mock_server_class.return_value = mock_server
# Mock server to raise exception
async def mock_run_server(*args):
raise Exception("Server error")
mock_server.run_server = mock_run_server
runner = CliRunner()
with patch('asyncio.run'):
result = runner.invoke(app, ["serve"])
# Should handle the exception gracefully
# Exit code depends on implementation
class TestBannerAndOutput:
"""Test startup banner and output formatting."""
@patch('mcmqtt.main.get_version')
def test_startup_banner_display(self, mock_version):
"""Test that startup banner is displayed correctly."""
mock_version.return_value = "1.0.0"
runner = CliRunner()
with patch('asyncio.run'):
result = runner.invoke(app, ["serve"])
assert "mcmqtt FastMCP MQTT Server v1.0.0" in result.stdout
def test_rich_console_output(self):
"""Test that rich console formatting works."""
from mcmqtt.main import console
from rich.console import Console
assert isinstance(console, Console)
def test_config_output_formatting(self):
"""Test config command output formatting."""
runner = CliRunner()
with patch.dict(os.environ, {"MQTT_BROKER_HOST": "test.com"}):
result = runner.invoke(app, ["config"])
assert result.exit_code == 0
assert "Configuration Sources:" in result.stdout
assert "Environment Variables:" in result.stdout