"""Comprehensive unit tests for MCP Server functionality.""" import asyncio import json import pytest from unittest.mock import AsyncMock, MagicMock, patch, PropertyMock from datetime import datetime, timedelta from mcmqtt.mcp.server import MCMQTTServer from mcmqtt.mqtt.types import MQTTConfig, MQTTQoS, MQTTConnectionState, MQTTMessage, MQTTStats from mcmqtt.mqtt.client import MQTTClient from mcmqtt.mqtt.publisher import MQTTPublisher from mcmqtt.mqtt.subscriber import MQTTSubscriber from mcmqtt.broker.manager import BrokerManager, BrokerInfo, BrokerConfig class TestMCMQTTServer: """Test cases for MCMQTTServer class.""" @pytest.fixture def mqtt_config(self): """Create a test MQTT configuration.""" return MQTTConfig( broker_host="localhost", broker_port=1883, client_id="test_mcp_client", username="test_user", password="test_pass", keepalive=60, qos=MQTTQoS.AT_LEAST_ONCE ) @pytest.fixture def mock_broker_manager(self): """Create a mock broker manager.""" manager = MagicMock(spec=BrokerManager) manager.is_available.return_value = True manager.spawn_broker = AsyncMock() manager.stop_broker = AsyncMock() manager.list_brokers = AsyncMock() manager.get_broker_status = AsyncMock() manager.stop_all = AsyncMock() return manager @pytest.fixture def server(self, mqtt_config, mock_broker_manager): """Create a server instance with mocked dependencies.""" with patch('mcmqtt.mcp.server.BrokerManager', return_value=mock_broker_manager), \ patch.object(MCMQTTServer, 'register_all'): server = MCMQTTServer(mqtt_config, enable_auto_broker=True) return server @pytest.fixture def server_no_auto_broker(self, mqtt_config): """Create a server instance without auto broker.""" with patch('mcmqtt.mcp.server.BrokerManager'), \ patch.object(MCMQTTServer, 'register_all'): server = MCMQTTServer(mqtt_config, enable_auto_broker=False) return server def test_server_initialization_with_auto_broker(self, mqtt_config, mock_broker_manager): """Test server initialization with auto broker enabled.""" with patch('mcmqtt.mcp.server.BrokerManager', return_value=mock_broker_manager), \ patch.object(MCMQTTServer, 'register_all'): server = MCMQTTServer(mqtt_config, enable_auto_broker=True) assert server.mqtt_config == mqtt_config assert server.mqtt_client is None assert server.mqtt_publisher is None assert server.mqtt_subscriber is None assert server.broker_manager == mock_broker_manager assert server.mcp is not None assert server._connection_state == MQTTConnectionState.DISCONNECTED def test_server_initialization_without_auto_broker(self, mqtt_config): """Test server initialization without auto broker.""" with patch('mcmqtt.mcp.server.BrokerManager'), \ patch.object(MCMQTTServer, 'register_all'): server = MCMQTTServer(mqtt_config, enable_auto_broker=False) assert server.mqtt_config == mqtt_config assert server.broker_manager is not None # No middleware should be added when auto_broker is False def test_server_initialization_no_config(self): """Test server initialization without MQTT config.""" with patch('mcmqtt.mcp.server.BrokerManager'), \ patch.object(MCMQTTServer, 'register_all'): server = MCMQTTServer(mqtt_config=None, enable_auto_broker=False) assert server.mqtt_config is None assert server._connection_state == MQTTConnectionState.DISCONNECTED @pytest.mark.asyncio async def test_mqtt_connect_success(self, server, mqtt_config): """Test successful MQTT connection.""" mock_client = MagicMock(spec=MQTTClient) mock_client.connect = AsyncMock(return_value=True) mock_client.is_connected = True mock_client.connection_info = MagicMock() with patch('mcmqtt.mcp.server.MQTTClient', return_value=mock_client), \ patch('mcmqtt.mcp.server.MQTTPublisher') as mock_pub, \ patch('mcmqtt.mcp.server.MQTTSubscriber') as mock_sub: result = await server.connect_to_broker( broker_host="localhost", broker_port=1883, client_id="test_client" ) assert result["success"] is True assert "Connected to MQTT broker" in result["message"] assert server.mqtt_client == mock_client assert server._connection_state == MQTTConnectionState.CONNECTED # Verify client was configured correctly mock_client.connect.assert_called_once() @pytest.mark.asyncio async def test_mqtt_connect_failure(self, server): """Test MQTT connection failure.""" mock_client = MagicMock(spec=MQTTClient) mock_client.connect = AsyncMock(return_value=False) with patch('mcmqtt.mcp.server.MQTTClient', return_value=mock_client): result = await server.connect_to_broker( broker_host="localhost", broker_port=1883, client_id="test_client" ) assert result["success"] is False assert "Failed to connect" in result["message"] assert server._connection_state == MQTTConnectionState.ERROR @pytest.mark.asyncio async def test_mqtt_connect_with_existing_client(self, server): """Test MQTT connect when client already exists.""" # Set up existing client existing_client = MagicMock(spec=MQTTClient) existing_client.disconnect = AsyncMock(return_value=True) server.mqtt_client = existing_client mock_new_client = AsyncMock() mock_new_client.connect = AsyncMock(return_value=True) mock_new_client.is_connected = True mock_publisher = MagicMock() with patch('mcmqtt.mcp.server.MQTTClient', return_value=mock_new_client), \ patch('mcmqtt.mcp.server.MQTTPublisher', return_value=mock_publisher): result = await server.connect_to_broker( broker_host="localhost", broker_port=1883, client_id="test_client" ) # The implementation replaces the client without disconnecting the old one # (this is the actual behavior, not necessarily ideal) assert server.mqtt_client == mock_new_client assert result["success"] is True assert result["client_id"] == "test_client" @pytest.mark.asyncio async def test_mqtt_disconnect_success(self, server): """Test successful MQTT disconnection.""" mock_client = AsyncMock() mock_client.disconnect = AsyncMock(return_value=True) server.mqtt_client = mock_client server._connection_state = MQTTConnectionState.CONNECTED result = await server.disconnect_from_broker() assert result["success"] is True assert result["message"] == "Disconnected from MQTT broker" assert result["connection_state"] == MQTTConnectionState.DISCONNECTED.value mock_client.disconnect.assert_called_once() assert server._connection_state == MQTTConnectionState.DISCONNECTED @pytest.mark.asyncio async def test_mqtt_disconnect_no_client(self, server): """Test MQTT disconnect when no client exists.""" result = await server.disconnect_from_broker() # Implementation returns success: True even when no client exists (idempotent) assert result["success"] is True assert result["message"] == "Disconnected from MQTT broker" @pytest.mark.asyncio async def test_mqtt_publish_success(self, server): """Test successful MQTT message publishing.""" # Mock the MQTT client and set connected state mock_client = AsyncMock() mock_client.publish = AsyncMock(return_value=True) server.mqtt_client = mock_client server.mqtt_publisher = MagicMock() # Must exist for the check server._connection_state = MQTTConnectionState.CONNECTED result = await server.publish_message( topic="test/topic", payload="test message", qos=1, retain=False ) assert result["success"] is True assert result["topic"] == "test/topic" assert result["message"] == "Published message to test/topic" mock_client.publish.assert_called_once_with( topic="test/topic", payload="test message", qos=MQTTQoS.AT_LEAST_ONCE, retain=False ) @pytest.mark.asyncio async def test_mqtt_publish_no_client(self, server): """Test MQTT publish when no client exists.""" result = await server.publish_message( topic="test/topic", payload="test message" ) assert result["success"] is False assert result["message"] == "Not connected to MQTT broker" @pytest.mark.asyncio async def test_mqtt_publish_json_payload(self, server): """Test MQTT publish with JSON payload.""" # Mock the MQTT client and set connected state mock_client = AsyncMock() mock_client.publish = AsyncMock(return_value=True) server.mqtt_client = mock_client server.mqtt_publisher = MagicMock() # Must exist for the check server._connection_state = MQTTConnectionState.CONNECTED test_data = {"temperature": 22.5, "humidity": 60} result = await server.publish_message( topic="sensors/room1", payload=test_data ) assert result["success"] is True assert result["topic"] == "sensors/room1" mock_client.publish.assert_called_once() @pytest.mark.asyncio async def test_mqtt_subscribe_success(self, server): """Test successful MQTT subscription.""" # Mock the MQTT client and set connected state mock_client = AsyncMock() mock_client.subscribe = AsyncMock(return_value=True) server.mqtt_client = mock_client server._connection_state = MQTTConnectionState.CONNECTED result = await server.subscribe_to_topic( topic="test/topic", qos=1 ) assert result["success"] is True assert result["topic"] == "test/topic" mock_client.subscribe.assert_called_once() @pytest.mark.asyncio async def test_mqtt_unsubscribe_success(self, server): """Test successful MQTT unsubscription.""" # Mock the MQTT client and set connected state mock_client = AsyncMock() mock_client.unsubscribe = AsyncMock(return_value=True) server.mqtt_client = mock_client server._connection_state = MQTTConnectionState.CONNECTED result = await server.unsubscribe_from_topic(topic="test/topic") assert result["success"] is True assert result["topic"] == "test/topic" mock_client.unsubscribe.assert_called_once_with("test/topic") @pytest.mark.asyncio async def test_mqtt_status_connected(self, server): """Test MQTT status when connected.""" mock_client = MagicMock(spec=MQTTClient) mock_client.is_connected = True mock_client.get_subscriptions.return_value = {"test/topic": MQTTQoS.AT_LEAST_ONCE} mock_stats = MQTTStats() mock_stats.messages_sent = 10 mock_stats.messages_received = 5 mock_stats.bytes_sent = 100 mock_stats.bytes_received = 50 mock_stats.topics_subscribed = 1 mock_stats.connection_uptime = 30.0 mock_stats.last_message_time = None mock_client.stats = mock_stats server.mqtt_client = mock_client server._connection_state = MQTTConnectionState.CONNECTED result = await server.get_status() assert result["connection_state"] == "connected" assert result["statistics"]["messages_sent"] == 10 assert result["statistics"]["messages_received"] == 5 assert result["subscriptions"] == ["test/topic"] assert result["message_count"] == 0 @pytest.mark.asyncio async def test_mqtt_status_disconnected(self, server): """Test MQTT status when disconnected.""" result = await server.get_status() assert result["connection_state"] == MQTTConnectionState.DISCONNECTED.value assert result["statistics"] == {} assert result["subscriptions"] == [] assert result["message_count"] == 0 @pytest.mark.asyncio async def test_mqtt_get_messages(self, server): """Test getting MQTT messages.""" # Set up message store directly (the actual implementation uses this) server._message_store = [ { "topic": "test/topic1", "payload": "payload1", "qos": 1, "received_at": datetime.utcnow() }, { "topic": "test/topic2", "payload": "payload2", "qos": 0, "received_at": datetime.utcnow() } ] result = await server.get_messages(limit=10) assert result["success"] is True assert len(result["messages"]) == 2 # Check that both topics are present (order may vary due to sorting) topics = [msg["topic"] for msg in result["messages"]] assert "test/topic1" in topics assert "test/topic2" in topics @pytest.mark.asyncio async def test_mqtt_list_subscriptions(self, server): """Test listing MQTT subscriptions.""" mock_subscriber = MagicMock(spec=MQTTSubscriber) mock_subscriber.get_all_subscriptions.return_value = { "test/topic1": MagicMock(topic="test/topic1", qos=MQTTQoS.AT_LEAST_ONCE), "test/topic2": MagicMock(topic="test/topic2", qos=MQTTQoS.AT_MOST_ONCE) } server.mqtt_subscriber = mock_subscriber result = await server.list_subscriptions() assert result["success"] is True assert len(result["subscriptions"]) == 2 @pytest.mark.asyncio async def test_broker_spawn_success(self, server, mock_broker_manager): """Test successful broker spawning.""" broker_info = BrokerInfo( broker_id="test-broker-123", config=BrokerConfig(name="test-broker", port=1883), status="running", url="mqtt://localhost:1883", pid=12345, started_at=datetime.now(), connections=0 ) mock_broker_manager.spawn_broker.return_value = "test-broker-123" mock_broker_manager.get_broker_status.return_value = broker_info result = await server.spawn_mqtt_broker( port=1883, name="test-broker", max_connections=100 ) assert result["success"] is True assert result["broker_id"] == "test-broker-123" assert result["url"] == "mqtt://localhost:1883" @pytest.mark.asyncio async def test_broker_stop_success(self, server, mock_broker_manager): """Test successful broker stopping.""" mock_broker_manager.stop_broker.return_value = True result = await server.stop_mqtt_broker(broker_id="test-broker-123") assert result["success"] is True assert "Broker stopped successfully" in result["message"] @pytest.mark.asyncio async def test_broker_list(self, server, mock_broker_manager): """Test listing brokers.""" broker_info = BrokerInfo( broker_id="test-broker-123", config=BrokerConfig(name="test-broker", port=1883), status="running", url="mqtt://localhost:1883", pid=12345, started_at=datetime.now(), connections=2 ) mock_broker_manager.list_brokers.return_value = [broker_info] result = await server.list_mqtt_brokers(running_only=False) assert result["success"] is True assert len(result["brokers"]) == 1 assert result["brokers"][0]["broker_id"] == "test-broker-123" @pytest.mark.asyncio async def test_broker_status(self, server, mock_broker_manager): """Test getting broker status.""" broker_info = BrokerInfo( broker_id="test-broker-123", config=BrokerConfig(name="test-broker", port=1883), status="running", url="mqtt://localhost:1883", pid=12345, started_at=datetime.now(), connections=2 ) mock_broker_manager.get_broker_status.return_value = broker_info result = await server.get_mqtt_broker_status(broker_id="test-broker-123") assert result["success"] is True assert result["broker_id"] == "test-broker-123" assert result["status"] == "running" assert result["connections"] == 2 @pytest.mark.asyncio async def test_broker_stop_all(self, server, mock_broker_manager): """Test stopping all brokers.""" mock_broker_manager.stop_all.return_value = 3 # Number of brokers stopped result = await server.stop_all_mqtt_brokers() assert result["success"] is True assert result["brokers_stopped"] == 3 # Resource tests @pytest.mark.asyncio async def test_get_config_resource(self, server, mqtt_config): """Test getting config resource.""" server.mqtt_config = mqtt_config result = await server.get_config_resource() assert result["broker_host"] == "localhost" assert result["broker_port"] == 1883 assert result["client_id"] == "test_mcp_client" # Should not expose sensitive data assert "password" not in result @pytest.mark.asyncio async def test_get_statistics_resource(self, server): """Test getting statistics resource.""" mock_client = MagicMock(spec=MQTTClient) mock_stats = MQTTStats() mock_stats.messages_sent = 100 mock_stats.messages_received = 50 mock_client.stats = mock_stats server.mqtt_client = mock_client result = await server.get_stats_resource() assert result["messages_sent"] == 100 assert result["messages_received"] == 50 @pytest.mark.asyncio async def test_get_subscriptions_resource(self, server): """Test getting subscriptions resource.""" mock_subscriber = MagicMock(spec=MQTTSubscriber) mock_subscriber.get_all_subscriptions.return_value = { "sensors/+": MagicMock(topic="sensors/+", qos=MQTTQoS.AT_LEAST_ONCE) } server.mqtt_subscriber = mock_subscriber result = await server.get_subscriptions_resource() assert len(result["subscriptions"]) == 1 assert result["subscriptions"][0]["topic"] == "sensors/+" @pytest.mark.asyncio async def test_get_messages_resource(self, server): """Test getting messages resource.""" mock_subscriber = MagicMock(spec=MQTTSubscriber) msg = MQTTMessage("test/topic", "payload", MQTTQoS.AT_LEAST_ONCE) mock_subscriber.get_buffered_messages.return_value = [msg] server.mqtt_subscriber = mock_subscriber result = await server.get_messages_resource() assert len(result["messages"]) == 1 assert result["messages"][0]["topic"] == "test/topic" @pytest.mark.asyncio async def test_get_health_resource(self, server): """Test getting health resource.""" mock_client = MagicMock(spec=MQTTClient) mock_client.is_connected = True server.mqtt_client = mock_client server._connection_state = MQTTConnectionState.CONNECTED result = await server.get_health_resource() assert result["status"] == "healthy" assert result["mqtt_connected"] is True @pytest.mark.asyncio async def test_get_brokers_resource(self, server, mock_broker_manager): """Test getting brokers resource.""" broker_info = BrokerInfo( broker_id="test-broker-123", config=BrokerConfig(name="test-broker", port=1883), status="running", url="mqtt://localhost:1883", pid=12345, started_at=datetime.now(), connections=2 ) mock_broker_manager.list_brokers.return_value = [broker_info] result = await server.get_brokers_resource() assert len(result["brokers"]) == 1 assert result["brokers"][0]["broker_id"] == "test-broker-123" def test_server_string_representation(self, server): """Test server string representation.""" str_repr = str(server) assert "MCMQTTServer" in str_repr assert "CONFIGURED" in str_repr def test_cleanup_components(self, server): """Test component cleanup method.""" mock_client = MagicMock() mock_publisher = MagicMock() mock_subscriber = MagicMock() server.mqtt_client = mock_client server.mqtt_publisher = mock_publisher server.mqtt_subscriber = mock_subscriber server._cleanup_components() assert server.mqtt_client is None assert server.mqtt_publisher is None assert server.mqtt_subscriber is None if __name__ == "__main__": pytest.main([__file__])