"""Integration tests for MCP runtime tools via FastMCP Client. These tests verify the runtime MCP tools work correctly end-to-end, using a subprocess-based XML-RPC server (no Docker required). Run with: pytest tests/integration/test_mcp_runtime.py -v """ from __future__ import annotations import socket import subprocess import sys import textwrap import time from contextlib import closing from pathlib import Path from typing import Any, Generator import pytest from fastmcp import Client, FastMCP from gnuradio_mcp.middlewares.xmlrpc import XmlRpcMiddleware from gnuradio_mcp.providers.mcp_runtime import McpRuntimeProvider from gnuradio_mcp.providers.runtime import RuntimeProvider def extract_raw_value(result) -> Any: """Extract raw value from FastMCP result. When a tool returns a non-Pydantic value (like int, float, bool), FastMCP serializes it as TextContent. This helper parses it back. """ if result.data is not None: return result.data if result.content and len(result.content) > 0: text = result.content[0].text # Try to parse as float first (handles scientific notation) try: return float(text) except ValueError: pass # Try int try: return int(text) except ValueError: pass # Return as string return text return None def find_free_port() -> int: """Find an available port on localhost.""" with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: s.bind(("", 0)) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) return s.getsockname()[1] @pytest.fixture def xmlrpc_server_script(tmp_path: Path) -> Path: """Create a test XML-RPC server script that mimics GNU Radio.""" script = tmp_path / "test_xmlrpc_server.py" script.write_text( textwrap.dedent( '''\ #!/usr/bin/env python3 """Test XML-RPC server mimicking GNU Radio flowgraph interface.""" import os from xmlrpc.server import SimpleXMLRPCServer PORT = int(os.environ.get("XMLRPC_PORT", 8080)) _variables = { "frequency": 101.1e6, "amplitude": 0.5, "gain": 10, } def get_frequency(): return _variables["frequency"] def set_frequency(val): _variables["frequency"] = float(val) def get_amplitude(): return _variables["amplitude"] def set_amplitude(val): _variables["amplitude"] = float(val) def get_gain(): return _variables["gain"] def set_gain(val): _variables["gain"] = int(val) def start(): pass def stop(): pass def lock(): pass def unlock(): pass def main(): server = SimpleXMLRPCServer(("0.0.0.0", PORT), allow_none=True) server.register_introspection_functions() server.register_function(get_frequency) server.register_function(set_frequency) server.register_function(get_amplitude) server.register_function(set_amplitude) server.register_function(get_gain) server.register_function(set_gain) server.register_function(start) server.register_function(stop) server.register_function(lock) server.register_function(unlock) server.serve_forever() if __name__ == "__main__": main() ''' ) ) return script @pytest.fixture def xmlrpc_server( xmlrpc_server_script: Path, ) -> Generator[tuple[subprocess.Popen, int], None, None]: """Start the XML-RPC server subprocess.""" port = find_free_port() env = {**dict(__import__("os").environ), "XMLRPC_PORT": str(port)} proc = subprocess.Popen( [sys.executable, str(xmlrpc_server_script)], env=env, stderr=subprocess.PIPE, stdout=subprocess.PIPE, ) # Wait for server to be ready deadline = time.time() + 10 while time.time() < deadline: if proc.poll() is not None: stdout, stderr = proc.communicate() raise RuntimeError(f"Server exited: {stderr.decode()} {stdout.decode()}") try: sock = socket.create_connection(("127.0.0.1", port), timeout=0.5) sock.close() break except (ConnectionRefusedError, OSError): time.sleep(0.1) else: proc.kill() raise RuntimeError("XML-RPC server did not start in time") yield proc, port proc.terminate() try: proc.wait(timeout=5) except subprocess.TimeoutExpired: proc.kill() @pytest.fixture def runtime_mcp_app() -> FastMCP: """Create FastMCP app with runtime tools (no Docker).""" app = FastMCP("gr-mcp-runtime-test") # RuntimeProvider without Docker — XML-RPC tools still available provider = RuntimeProvider(docker_mw=None) McpRuntimeProvider(app, provider) return app @pytest.fixture async def runtime_client(runtime_mcp_app: FastMCP): """Create FastMCP client for runtime tools. Automatically enables runtime mode so runtime tools are available. """ async with Client(runtime_mcp_app) as client: # Enable runtime mode to register runtime tools dynamically await client.call_tool(name="enable_runtime_mode") yield client class TestRuntimeMcpToolsNoConnection: """Test runtime tools before connecting to a server.""" async def test_get_status_not_connected(self, runtime_client: Client): """get_status should work without connection, showing disconnected state.""" result = await runtime_client.call_tool(name="get_status") assert result.data is not None assert result.data.connected is False assert result.data.connection is None async def test_list_variables_requires_connection(self, runtime_client: Client): """list_variables should raise when not connected.""" with pytest.raises(Exception, match="Not connected"): await runtime_client.call_tool(name="list_variables") async def test_get_variable_requires_connection(self, runtime_client: Client): """get_variable should raise when not connected.""" with pytest.raises(Exception, match="Not connected"): await runtime_client.call_tool( name="get_variable", arguments={"name": "frequency"} ) async def test_disconnect_idempotent(self, runtime_client: Client): """disconnect should succeed even when not connected.""" result = await runtime_client.call_tool(name="disconnect") assert result.data is True class TestRuntimeMcpToolsConnected: """Test runtime tools connected to XML-RPC server.""" async def test_connect_success( self, runtime_client: Client, xmlrpc_server: tuple[subprocess.Popen, int] ): """Test connecting to XML-RPC server via MCP tool.""" _, port = xmlrpc_server url = f"http://127.0.0.1:{port}" result = await runtime_client.call_tool(name="connect", arguments={"url": url}) assert result.data is not None assert result.data.url == url assert "get_frequency" in result.data.methods async def test_connect_updates_status( self, runtime_client: Client, xmlrpc_server: tuple[subprocess.Popen, int] ): """After connecting, status should show connected.""" _, port = xmlrpc_server url = f"http://127.0.0.1:{port}" await runtime_client.call_tool(name="connect", arguments={"url": url}) result = await runtime_client.call_tool(name="get_status") assert result.data.connected is True assert result.data.connection.url == url async def test_list_variables( self, runtime_client: Client, xmlrpc_server: tuple[subprocess.Popen, int] ): """Test listing variables after connecting.""" _, port = xmlrpc_server url = f"http://127.0.0.1:{port}" await runtime_client.call_tool(name="connect", arguments={"url": url}) result = await runtime_client.call_tool(name="list_variables") assert result.data is not None names = {v.name for v in result.data} assert "frequency" in names assert "amplitude" in names assert "gain" in names async def test_get_variable( self, runtime_client: Client, xmlrpc_server: tuple[subprocess.Popen, int] ): """Test getting a variable value.""" _, port = xmlrpc_server url = f"http://127.0.0.1:{port}" await runtime_client.call_tool(name="connect", arguments={"url": url}) result = await runtime_client.call_tool( name="get_variable", arguments={"name": "frequency"} ) # get_variable returns raw values (float), not Pydantic models assert extract_raw_value(result) == 101.1e6 async def test_set_variable( self, runtime_client: Client, xmlrpc_server: tuple[subprocess.Popen, int] ): """Test setting a variable value.""" _, port = xmlrpc_server url = f"http://127.0.0.1:{port}" await runtime_client.call_tool(name="connect", arguments={"url": url}) # Set new value set_result = await runtime_client.call_tool( name="set_variable", arguments={"name": "frequency", "value": 107.2e6} ) assert set_result.data is True # Verify it was set get_result = await runtime_client.call_tool( name="get_variable", arguments={"name": "frequency"} ) assert extract_raw_value(get_result) == 107.2e6 async def test_flowgraph_control_start( self, runtime_client: Client, xmlrpc_server: tuple[subprocess.Popen, int] ): """Test starting the flowgraph.""" _, port = xmlrpc_server url = f"http://127.0.0.1:{port}" await runtime_client.call_tool(name="connect", arguments={"url": url}) result = await runtime_client.call_tool(name="start") assert result.data is True async def test_flowgraph_control_stop( self, runtime_client: Client, xmlrpc_server: tuple[subprocess.Popen, int] ): """Test stopping the flowgraph.""" _, port = xmlrpc_server url = f"http://127.0.0.1:{port}" await runtime_client.call_tool(name="connect", arguments={"url": url}) result = await runtime_client.call_tool(name="stop") assert result.data is True async def test_flowgraph_control_lock_unlock( self, runtime_client: Client, xmlrpc_server: tuple[subprocess.Popen, int] ): """Test lock/unlock sequence.""" _, port = xmlrpc_server url = f"http://127.0.0.1:{port}" await runtime_client.call_tool(name="connect", arguments={"url": url}) lock_result = await runtime_client.call_tool(name="lock") assert lock_result.data is True unlock_result = await runtime_client.call_tool(name="unlock") assert unlock_result.data is True async def test_disconnect_clears_connection( self, runtime_client: Client, xmlrpc_server: tuple[subprocess.Popen, int] ): """Test disconnecting clears the connection state.""" _, port = xmlrpc_server url = f"http://127.0.0.1:{port}" await runtime_client.call_tool(name="connect", arguments={"url": url}) await runtime_client.call_tool(name="disconnect") # Status should show disconnected result = await runtime_client.call_tool(name="get_status") assert result.data.connected is False class TestRuntimeMcpToolsFullWorkflow: """End-to-end workflow tests.""" async def test_tuning_workflow( self, runtime_client: Client, xmlrpc_server: tuple[subprocess.Popen, int] ): """Test a complete tuning workflow: connect, read, tune, verify.""" _, port = xmlrpc_server url = f"http://127.0.0.1:{port}" # Connect await runtime_client.call_tool(name="connect", arguments={"url": url}) # Read initial frequency initial = await runtime_client.call_tool( name="get_variable", arguments={"name": "frequency"} ) assert extract_raw_value(initial) == 101.1e6 # Tune to new frequency with lock/unlock await runtime_client.call_tool(name="lock") await runtime_client.call_tool( name="set_variable", arguments={"name": "frequency", "value": 98.5e6} ) await runtime_client.call_tool(name="unlock") # Verify final = await runtime_client.call_tool( name="get_variable", arguments={"name": "frequency"} ) assert extract_raw_value(final) == 98.5e6 # Disconnect await runtime_client.call_tool(name="disconnect") async def test_scan_and_tune_workflow( self, runtime_client: Client, xmlrpc_server: tuple[subprocess.Popen, int] ): """Simulate scanning through frequencies (mimics FM scanner use case).""" _, port = xmlrpc_server url = f"http://127.0.0.1:{port}" await runtime_client.call_tool(name="connect", arguments={"url": url}) # Scan through several frequencies test_frequencies = [88.1e6, 91.5e6, 95.7e6, 101.1e6, 107.9e6] for freq in test_frequencies: await runtime_client.call_tool( name="set_variable", arguments={"name": "frequency", "value": freq} ) result = await runtime_client.call_tool( name="get_variable", arguments={"name": "frequency"} ) assert extract_raw_value(result) == freq await runtime_client.call_tool(name="disconnect")