diff --git a/tests/integration/test_fm_scanner.py b/tests/integration/test_fm_scanner.py new file mode 100644 index 0000000..41ec2f1 --- /dev/null +++ b/tests/integration/test_fm_scanner.py @@ -0,0 +1,306 @@ +"""Integration tests for FM scanner signal probe functionality. + +Tests the programmatic flowgraph construction and signal probe features +added to the FM scanner. Requires GNU Radio but not RTL-SDR hardware. + +Run with: pytest tests/integration/test_fm_scanner.py -v +""" + +from __future__ import annotations + +import sys +from pathlib import Path + +import pytest + +# Add examples to path so we can import fm_scanner +sys.path.insert(0, str(Path(__file__).parent.parent.parent / "examples")) + +# Check if GNU Radio is available +try: + from gnuradio import gr + + GNURADIO_AVAILABLE = True +except ImportError: + GNURADIO_AVAILABLE = False + + +class TestSignalProbeHelpers: + """Unit tests for signal probe helper functions (no GNU Radio needed).""" + + def test_mag_squared_to_dbm_normal(self): + """Test dB conversion for normal values.""" + from fm_scanner import mag_squared_to_dbm + + # 1.0 → 0 dB + assert mag_squared_to_dbm(1.0) == 0.0 + + # 0.1 → -10 dB + assert abs(mag_squared_to_dbm(0.1) - (-10.0)) < 0.01 + + # 0.01 → -20 dB + assert abs(mag_squared_to_dbm(0.01) - (-20.0)) < 0.01 + + # 0.001 → -30 dB + assert abs(mag_squared_to_dbm(0.001) - (-30.0)) < 0.01 + + def test_mag_squared_to_dbm_zero(self): + """Test dB conversion handles zero gracefully.""" + from fm_scanner import mag_squared_to_dbm + + # Zero should return floor value, not crash + result = mag_squared_to_dbm(0.0) + assert result == -100.0 + + def test_mag_squared_to_dbm_negative(self): + """Test dB conversion handles negative values (shouldn't happen but be safe).""" + from fm_scanner import mag_squared_to_dbm + + result = mag_squared_to_dbm(-1.0) + assert result == -100.0 + + def test_format_signal_bar_strong(self): + """Test signal bar formatting for strong signals.""" + from fm_scanner import format_signal_bar + + bar = format_signal_bar(-30.0, width=20) + # Should be mostly filled and green + assert "█" in bar + assert "\033[32m" in bar # green color code + + def test_format_signal_bar_medium(self): + """Test signal bar formatting for medium signals.""" + from fm_scanner import format_signal_bar + + bar = format_signal_bar(-50.0, width=20) + # Should be yellow + assert "\033[33m" in bar # yellow color code + + def test_format_signal_bar_weak(self): + """Test signal bar formatting for weak signals.""" + from fm_scanner import format_signal_bar + + bar = format_signal_bar(-70.0, width=20) + # Should be red (weak signal) + assert "\033[31m" in bar # red color code + + def test_format_signal_bar_empty(self): + """Test signal bar formatting at floor.""" + from fm_scanner import format_signal_bar + + bar = format_signal_bar(-80.0, width=20) + # At -80 dB, should be empty (only unfilled blocks) + assert "░" in bar + + +class TestScannerParsing: + """Unit tests for scan data parsing.""" + + def test_parse_scan_valid_csv(self): + """Test parsing valid rtl_power CSV output.""" + from fm_scanner import parse_scan + + csv_data = """\ +2025-01-01, 12:00:00, 87500000, 87700000, 100000, 1, -45.2, -47.1 +2025-01-01, 12:00:01, 87700000, 87900000, 100000, 1, -52.3, -51.8 +""" + readings = parse_scan(csv_data) + + # Should have 4 readings (2 bins per row × 2 rows) + assert len(readings) == 4 + + # Check first reading + freq_mhz, power_dbm = readings[0] + assert 87.5 <= freq_mhz <= 87.6 # First bin + assert power_dbm == -45.2 + + def test_parse_scan_empty(self): + """Test parsing empty CSV.""" + from fm_scanner import parse_scan + + readings = parse_scan("") + assert readings == [] + + def test_parse_scan_malformed(self): + """Test parsing malformed CSV (should skip bad rows).""" + from fm_scanner import parse_scan + + csv_data = """\ +bad data +2025-01-01, 12:00:00, 87500000, 87700000, 100000, 1, -45.2 +more bad data +""" + readings = parse_scan(csv_data) + + # Should parse the valid row (1 bin) + assert len(readings) == 1 + + def test_aggregate_channels(self): + """Test channel aggregation snaps to FM channels.""" + from fm_scanner import aggregate_channels + + # Readings around 101.1 MHz + readings = [ + (101.05, -35.0), + (101.10, -30.0), + (101.15, -32.0), + ] + + channels = aggregate_channels(readings) + + # Should aggregate to one channel around 101.0-101.2 + assert len(channels) >= 1 + + # Find the 101.0 channel + ch101 = next((c for c in channels if 100.9 <= c["freq_mhz"] <= 101.3), None) + assert ch101 is not None + # Max power should be used + assert ch101["power_dbm"] == -30.0 + + def test_detect_stations(self): + """Test station detection above noise floor.""" + from fm_scanner import detect_stations + + channels = [ + {"freq_mhz": 88.1, "power_dbm": -50.0}, # noise + {"freq_mhz": 91.5, "power_dbm": -30.0}, # station! + {"freq_mhz": 93.3, "power_dbm": -48.0}, # noise + {"freq_mhz": 101.1, "power_dbm": -25.0}, # station! + {"freq_mhz": 105.5, "power_dbm": -52.0}, # noise + ] + + stations, noise_floor = detect_stations(channels, threshold_db=10.0) + + # Median of [-50, -30, -48, -25, -52] = -48 + assert -50 < noise_floor < -45 + + # Should detect 2 stations (>10 dB above noise) + assert len(stations) == 2 + + # Strongest should be first + assert stations[0]["freq_mhz"] == 101.1 + assert stations[1]["freq_mhz"] == 91.5 + + +@pytest.mark.skipif(not GNURADIO_AVAILABLE, reason="GNU Radio not available") +class TestFlowgraphConstruction: + """Integration tests for flowgraph construction with signal probe.""" + + def test_build_fm_receiver_creates_grc(self): + """Test that build_fm_receiver creates a valid .grc file.""" + from fm_scanner import build_fm_receiver + + py_path = build_fm_receiver(101.1, gain=10) + + # Should return a path to a Python file + assert py_path.exists() + assert py_path.suffix == ".py" + + # Read and verify it contains expected components + py_code = py_path.read_text() + + # Should have XML-RPC server + assert "SimpleXMLRPCServer" in py_code + + # Should have signal probe (analog_probe_avg_mag_sqrd) + assert "probe_avg_mag_sqrd" in py_code + + # Should have signal_level variable + assert "signal_level" in py_code + + # Should have freq variable + assert "freq" in py_code + + # Should have get_signal_level method + assert "get_signal_level" in py_code + + def test_build_fm_receiver_has_signal_probe_block(self): + """Verify the flowgraph includes the signal probe block.""" + from gnuradio.grc.core.platform import Platform + from gnuradio import gr + + # Initialize platform + platform = Platform( + version=gr.version(), + version_parts=(gr.major_version(), gr.api_version(), gr.minor_version()), + prefs=gr.prefs(), + ) + platform.build_library() + + # Verify the probe block type exists + block_keys = list(platform.blocks.keys()) + assert "analog_probe_avg_mag_sqrd_x" in block_keys + + def test_build_fm_receiver_has_function_probe_block(self): + """Verify the flowgraph includes the variable function probe block.""" + from gnuradio.grc.core.platform import Platform + from gnuradio import gr + + # Initialize platform + platform = Platform( + version=gr.version(), + version_parts=(gr.major_version(), gr.api_version(), gr.minor_version()), + prefs=gr.prefs(), + ) + platform.build_library() + + # Verify the function probe block type exists + block_keys = list(platform.blocks.keys()) + assert "variable_function_probe" in block_keys + + def test_flowgraph_compiled_structure(self): + """Verify the compiled flowgraph has correct structure.""" + from fm_scanner import build_fm_receiver + import ast + + py_path = build_fm_receiver(98.5, gain=20) + py_code = py_path.read_text() + + # Parse as AST to verify structure + tree = ast.parse(py_code) + + # Find the class definition + class_defs = [node for node in ast.walk(tree) if isinstance(node, ast.ClassDef)] + assert len(class_defs) >= 1 + + # Find method definitions + fm_class = class_defs[0] + method_names = [ + node.name for node in fm_class.body if isinstance(node, ast.FunctionDef) + ] + + # Should have get/set methods for freq and signal_level + assert "get_freq" in method_names + assert "set_freq" in method_names + assert "get_signal_level" in method_names + assert "set_signal_level" in method_names + + +@pytest.mark.skipif(not GNURADIO_AVAILABLE, reason="GNU Radio not available") +class TestSignalProbeIntegration: + """Tests for signal probe XML-RPC integration (requires GNU Radio).""" + + def test_compiled_flowgraph_has_xmlrpc(self): + """Verify compiled flowgraph has XML-RPC server setup.""" + from fm_scanner import build_fm_receiver + + py_path = build_fm_receiver(107.2, gain=15) + py_code = py_path.read_text() + + # Should configure XML-RPC on port 8090 + assert "8090" in py_code + assert "0.0.0.0" in py_code or "''" in py_code # Bind address + + def test_signal_probe_connection(self): + """Verify signal probe is connected to the LPF output.""" + from fm_scanner import build_fm_receiver + + py_path = build_fm_receiver(101.1, gain=10) + py_code = py_path.read_text() + + # The probe should be connected (look for connection pattern) + # In generated code, connections are made via self.connect() + assert "signal_probe" in py_code + + # The probe should sample from the low pass filter + assert "low_pass_filter" in py_code diff --git a/tests/integration/test_mcp_runtime.py b/tests/integration/test_mcp_runtime.py new file mode 100644 index 0000000..0b3d9d1 --- /dev/null +++ b/tests/integration/test_mcp_runtime.py @@ -0,0 +1,412 @@ +"""Integration tests for MCP runtime tools via FastMCP Client. + +These tests verify the runtime MCP tools work correctly end-to-end, +using a subprocess-based XML-RPC server (no Docker required). + +Run with: pytest tests/integration/test_mcp_runtime.py -v +""" + +from __future__ import annotations + +import socket +import subprocess +import sys +import textwrap +import time +from contextlib import closing +from pathlib import Path +from typing import Any, Generator + +import pytest +from fastmcp import Client, FastMCP + +from gnuradio_mcp.middlewares.xmlrpc import XmlRpcMiddleware +from gnuradio_mcp.providers.mcp_runtime import McpRuntimeProvider +from gnuradio_mcp.providers.runtime import RuntimeProvider + + +def extract_raw_value(result) -> Any: + """Extract raw value from FastMCP result. + + When a tool returns a non-Pydantic value (like int, float, bool), + FastMCP serializes it as TextContent. This helper parses it back. + """ + if result.data is not None: + return result.data + if result.content and len(result.content) > 0: + text = result.content[0].text + # Try to parse as float first (handles scientific notation) + try: + return float(text) + except ValueError: + pass + # Try int + try: + return int(text) + except ValueError: + pass + # Return as string + return text + return None + + +def find_free_port() -> int: + """Find an available port on localhost.""" + with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: + s.bind(("", 0)) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + return s.getsockname()[1] + + +@pytest.fixture +def xmlrpc_server_script(tmp_path: Path) -> Path: + """Create a test XML-RPC server script that mimics GNU Radio.""" + script = tmp_path / "test_xmlrpc_server.py" + script.write_text( + textwrap.dedent( + '''\ + #!/usr/bin/env python3 + """Test XML-RPC server mimicking GNU Radio flowgraph interface.""" + + import os + from xmlrpc.server import SimpleXMLRPCServer + + PORT = int(os.environ.get("XMLRPC_PORT", 8080)) + + _variables = { + "frequency": 101.1e6, + "amplitude": 0.5, + "gain": 10, + } + + def get_frequency(): + return _variables["frequency"] + + def set_frequency(val): + _variables["frequency"] = float(val) + + def get_amplitude(): + return _variables["amplitude"] + + def set_amplitude(val): + _variables["amplitude"] = float(val) + + def get_gain(): + return _variables["gain"] + + def set_gain(val): + _variables["gain"] = int(val) + + def start(): + pass + + def stop(): + pass + + def lock(): + pass + + def unlock(): + pass + + def main(): + server = SimpleXMLRPCServer(("0.0.0.0", PORT), allow_none=True) + server.register_introspection_functions() + server.register_function(get_frequency) + server.register_function(set_frequency) + server.register_function(get_amplitude) + server.register_function(set_amplitude) + server.register_function(get_gain) + server.register_function(set_gain) + server.register_function(start) + server.register_function(stop) + server.register_function(lock) + server.register_function(unlock) + server.serve_forever() + + if __name__ == "__main__": + main() + ''' + ) + ) + return script + + +@pytest.fixture +def xmlrpc_server( + xmlrpc_server_script: Path, +) -> Generator[tuple[subprocess.Popen, int], None, None]: + """Start the XML-RPC server subprocess.""" + port = find_free_port() + env = {**dict(__import__("os").environ), "XMLRPC_PORT": str(port)} + + proc = subprocess.Popen( + [sys.executable, str(xmlrpc_server_script)], + env=env, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + ) + + # Wait for server to be ready + deadline = time.time() + 10 + while time.time() < deadline: + if proc.poll() is not None: + stdout, stderr = proc.communicate() + raise RuntimeError(f"Server exited: {stderr.decode()} {stdout.decode()}") + try: + sock = socket.create_connection(("127.0.0.1", port), timeout=0.5) + sock.close() + break + except (ConnectionRefusedError, OSError): + time.sleep(0.1) + else: + proc.kill() + raise RuntimeError("XML-RPC server did not start in time") + + yield proc, port + + proc.terminate() + try: + proc.wait(timeout=5) + except subprocess.TimeoutExpired: + proc.kill() + + +@pytest.fixture +def runtime_mcp_app() -> FastMCP: + """Create FastMCP app with runtime tools (no Docker).""" + app = FastMCP("gr-mcp-runtime-test") + # RuntimeProvider without Docker — XML-RPC tools still available + provider = RuntimeProvider(docker_mw=None) + McpRuntimeProvider(app, provider) + return app + + +@pytest.fixture +async def runtime_client(runtime_mcp_app: FastMCP): + """Create FastMCP client for runtime tools.""" + async with Client(runtime_mcp_app) as client: + yield client + + +class TestRuntimeMcpToolsNoConnection: + """Test runtime tools before connecting to a server.""" + + async def test_get_status_not_connected(self, runtime_client: Client): + """get_status should work without connection, showing disconnected state.""" + result = await runtime_client.call_tool(name="get_status") + + assert result.data is not None + assert result.data.connected is False + assert result.data.connection is None + + async def test_list_variables_requires_connection(self, runtime_client: Client): + """list_variables should raise when not connected.""" + with pytest.raises(Exception, match="Not connected"): + await runtime_client.call_tool(name="list_variables") + + async def test_get_variable_requires_connection(self, runtime_client: Client): + """get_variable should raise when not connected.""" + with pytest.raises(Exception, match="Not connected"): + await runtime_client.call_tool( + name="get_variable", arguments={"name": "frequency"} + ) + + async def test_disconnect_idempotent(self, runtime_client: Client): + """disconnect should succeed even when not connected.""" + result = await runtime_client.call_tool(name="disconnect") + assert result.data is True + + +class TestRuntimeMcpToolsConnected: + """Test runtime tools connected to XML-RPC server.""" + + async def test_connect_success( + self, runtime_client: Client, xmlrpc_server: tuple[subprocess.Popen, int] + ): + """Test connecting to XML-RPC server via MCP tool.""" + _, port = xmlrpc_server + url = f"http://127.0.0.1:{port}" + + result = await runtime_client.call_tool(name="connect", arguments={"url": url}) + + assert result.data is not None + assert result.data.url == url + assert "get_frequency" in result.data.methods + + async def test_connect_updates_status( + self, runtime_client: Client, xmlrpc_server: tuple[subprocess.Popen, int] + ): + """After connecting, status should show connected.""" + _, port = xmlrpc_server + url = f"http://127.0.0.1:{port}" + + await runtime_client.call_tool(name="connect", arguments={"url": url}) + result = await runtime_client.call_tool(name="get_status") + + assert result.data.connected is True + assert result.data.connection.url == url + + async def test_list_variables( + self, runtime_client: Client, xmlrpc_server: tuple[subprocess.Popen, int] + ): + """Test listing variables after connecting.""" + _, port = xmlrpc_server + url = f"http://127.0.0.1:{port}" + + await runtime_client.call_tool(name="connect", arguments={"url": url}) + result = await runtime_client.call_tool(name="list_variables") + + assert result.data is not None + names = {v.name for v in result.data} + assert "frequency" in names + assert "amplitude" in names + assert "gain" in names + + async def test_get_variable( + self, runtime_client: Client, xmlrpc_server: tuple[subprocess.Popen, int] + ): + """Test getting a variable value.""" + _, port = xmlrpc_server + url = f"http://127.0.0.1:{port}" + + await runtime_client.call_tool(name="connect", arguments={"url": url}) + result = await runtime_client.call_tool( + name="get_variable", arguments={"name": "frequency"} + ) + + # get_variable returns raw values (float), not Pydantic models + assert extract_raw_value(result) == 101.1e6 + + async def test_set_variable( + self, runtime_client: Client, xmlrpc_server: tuple[subprocess.Popen, int] + ): + """Test setting a variable value.""" + _, port = xmlrpc_server + url = f"http://127.0.0.1:{port}" + + await runtime_client.call_tool(name="connect", arguments={"url": url}) + + # Set new value + set_result = await runtime_client.call_tool( + name="set_variable", arguments={"name": "frequency", "value": 107.2e6} + ) + assert set_result.data is True + + # Verify it was set + get_result = await runtime_client.call_tool( + name="get_variable", arguments={"name": "frequency"} + ) + assert extract_raw_value(get_result) == 107.2e6 + + async def test_flowgraph_control_start( + self, runtime_client: Client, xmlrpc_server: tuple[subprocess.Popen, int] + ): + """Test starting the flowgraph.""" + _, port = xmlrpc_server + url = f"http://127.0.0.1:{port}" + + await runtime_client.call_tool(name="connect", arguments={"url": url}) + result = await runtime_client.call_tool(name="start") + + assert result.data is True + + async def test_flowgraph_control_stop( + self, runtime_client: Client, xmlrpc_server: tuple[subprocess.Popen, int] + ): + """Test stopping the flowgraph.""" + _, port = xmlrpc_server + url = f"http://127.0.0.1:{port}" + + await runtime_client.call_tool(name="connect", arguments={"url": url}) + result = await runtime_client.call_tool(name="stop") + + assert result.data is True + + async def test_flowgraph_control_lock_unlock( + self, runtime_client: Client, xmlrpc_server: tuple[subprocess.Popen, int] + ): + """Test lock/unlock sequence.""" + _, port = xmlrpc_server + url = f"http://127.0.0.1:{port}" + + await runtime_client.call_tool(name="connect", arguments={"url": url}) + + lock_result = await runtime_client.call_tool(name="lock") + assert lock_result.data is True + + unlock_result = await runtime_client.call_tool(name="unlock") + assert unlock_result.data is True + + async def test_disconnect_clears_connection( + self, runtime_client: Client, xmlrpc_server: tuple[subprocess.Popen, int] + ): + """Test disconnecting clears the connection state.""" + _, port = xmlrpc_server + url = f"http://127.0.0.1:{port}" + + await runtime_client.call_tool(name="connect", arguments={"url": url}) + await runtime_client.call_tool(name="disconnect") + + # Status should show disconnected + result = await runtime_client.call_tool(name="get_status") + assert result.data.connected is False + + +class TestRuntimeMcpToolsFullWorkflow: + """End-to-end workflow tests.""" + + async def test_tuning_workflow( + self, runtime_client: Client, xmlrpc_server: tuple[subprocess.Popen, int] + ): + """Test a complete tuning workflow: connect, read, tune, verify.""" + _, port = xmlrpc_server + url = f"http://127.0.0.1:{port}" + + # Connect + await runtime_client.call_tool(name="connect", arguments={"url": url}) + + # Read initial frequency + initial = await runtime_client.call_tool( + name="get_variable", arguments={"name": "frequency"} + ) + assert extract_raw_value(initial) == 101.1e6 + + # Tune to new frequency with lock/unlock + await runtime_client.call_tool(name="lock") + await runtime_client.call_tool( + name="set_variable", arguments={"name": "frequency", "value": 98.5e6} + ) + await runtime_client.call_tool(name="unlock") + + # Verify + final = await runtime_client.call_tool( + name="get_variable", arguments={"name": "frequency"} + ) + assert extract_raw_value(final) == 98.5e6 + + # Disconnect + await runtime_client.call_tool(name="disconnect") + + async def test_scan_and_tune_workflow( + self, runtime_client: Client, xmlrpc_server: tuple[subprocess.Popen, int] + ): + """Simulate scanning through frequencies (mimics FM scanner use case).""" + _, port = xmlrpc_server + url = f"http://127.0.0.1:{port}" + + await runtime_client.call_tool(name="connect", arguments={"url": url}) + + # Scan through several frequencies + test_frequencies = [88.1e6, 91.5e6, 95.7e6, 101.1e6, 107.9e6] + + for freq in test_frequencies: + await runtime_client.call_tool( + name="set_variable", arguments={"name": "frequency", "value": freq} + ) + result = await runtime_client.call_tool( + name="get_variable", arguments={"name": "frequency"} + ) + assert extract_raw_value(result) == freq + + await runtime_client.call_tool(name="disconnect") diff --git a/tests/integration/test_xmlrpc_subprocess.py b/tests/integration/test_xmlrpc_subprocess.py new file mode 100644 index 0000000..20b4d24 --- /dev/null +++ b/tests/integration/test_xmlrpc_subprocess.py @@ -0,0 +1,412 @@ +"""Integration tests for XmlRpcMiddleware using a real subprocess. + +These tests spawn an actual XML-RPC server without requiring Docker, +making them faster and more reliable for CI/CD pipelines. + +Run with: pytest tests/integration/test_xmlrpc_subprocess.py -v +""" + +from __future__ import annotations + +import socket +import subprocess +import sys +import textwrap +import time +from contextlib import closing +from pathlib import Path +from typing import Generator + +import pytest + +from gnuradio_mcp.middlewares.xmlrpc import XmlRpcMiddleware +from gnuradio_mcp.models import ConnectionInfoModel, VariableModel + + +def find_free_port() -> int: + """Find an available port on localhost.""" + with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: + s.bind(("", 0)) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + return s.getsockname()[1] + + +@pytest.fixture +def xmlrpc_server_script(tmp_path: Path) -> Path: + """Create a test XML-RPC server script that mimics GNU Radio. + + This server simulates the XML-RPC interface exposed by GNU Radio + flowgraphs, including: + - get_*/set_* variable accessors + - start/stop/lock/unlock flowgraph control + - system.listMethods introspection (optional) + """ + script = tmp_path / "test_xmlrpc_server.py" + script.write_text( + textwrap.dedent( + '''\ + #!/usr/bin/env python3 + """Test XML-RPC server mimicking GNU Radio flowgraph interface.""" + + import os + import sys + from xmlrpc.server import SimpleXMLRPCServer + + PORT = int(os.environ.get("XMLRPC_PORT", 8080)) + ENABLE_INTROSPECTION = os.environ.get("ENABLE_INTROSPECTION", "1") == "1" + + # Simulated flowgraph variables with various types + _variables = { + "frequency": 101.1e6, # float (Hz) + "amplitude": 0.5, # float (0-1) + "gain": 10, # int (dB) + "enabled": True, # bool + } + + _flowgraph_state = { + "running": False, + "locked": False, + } + + + # Variable accessors (GNU Radio pattern: get_ / set_) + def get_frequency(): + return _variables["frequency"] + + def set_frequency(val): + _variables["frequency"] = float(val) + + def get_amplitude(): + return _variables["amplitude"] + + def set_amplitude(val): + _variables["amplitude"] = float(val) + + def get_gain(): + return _variables["gain"] + + def set_gain(val): + _variables["gain"] = int(val) + + def get_enabled(): + return _variables["enabled"] + + def set_enabled(val): + _variables["enabled"] = bool(val) + + + # Read-only variable (no setter) + def get_sample_rate(): + return 2.4e6 + + + # Flowgraph control + def start(): + _flowgraph_state["running"] = True + print("Flowgraph started", file=sys.stderr, flush=True) + + def stop(): + _flowgraph_state["running"] = False + print("Flowgraph stopped", file=sys.stderr, flush=True) + + def lock(): + _flowgraph_state["locked"] = True + print("Flowgraph locked", file=sys.stderr, flush=True) + + def unlock(): + _flowgraph_state["locked"] = False + print("Flowgraph unlocked", file=sys.stderr, flush=True) + + + def main(): + server = SimpleXMLRPCServer(("0.0.0.0", PORT), allow_none=True) + + if ENABLE_INTROSPECTION: + server.register_introspection_functions() + + # Register all functions + server.register_function(get_frequency) + server.register_function(set_frequency) + server.register_function(get_amplitude) + server.register_function(set_amplitude) + server.register_function(get_gain) + server.register_function(set_gain) + server.register_function(get_enabled) + server.register_function(set_enabled) + server.register_function(get_sample_rate) + server.register_function(start) + server.register_function(stop) + server.register_function(lock) + server.register_function(unlock) + + print(f"XML-RPC server ready on port {PORT}", file=sys.stderr, flush=True) + server.serve_forever() + + + if __name__ == "__main__": + main() + ''' + ) + ) + return script + + +@pytest.fixture +def xmlrpc_server( + xmlrpc_server_script: Path, +) -> Generator[tuple[subprocess.Popen, int], None, None]: + """Start the XML-RPC server subprocess and wait for it to be ready.""" + port = find_free_port() + env = {**dict(__import__("os").environ), "XMLRPC_PORT": str(port)} + + proc = subprocess.Popen( + [sys.executable, str(xmlrpc_server_script)], + env=env, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + ) + + # Wait for server to be ready (reads "ready" from stderr) + deadline = time.time() + 10 + while time.time() < deadline: + if proc.poll() is not None: + # Process exited unexpectedly + stdout, stderr = proc.communicate() + raise RuntimeError( + f"XML-RPC server exited: {stderr.decode()} {stdout.decode()}" + ) + + # Try connecting + try: + sock = socket.create_connection(("127.0.0.1", port), timeout=0.5) + sock.close() + break + except (ConnectionRefusedError, OSError): + time.sleep(0.1) + else: + proc.kill() + raise RuntimeError("XML-RPC server did not start in time") + + yield proc, port + + # Cleanup + proc.terminate() + try: + proc.wait(timeout=5) + except subprocess.TimeoutExpired: + proc.kill() + + +class TestXmlRpcMiddlewareIntegration: + """Integration tests for XmlRpcMiddleware against a real server.""" + + def test_connect_success(self, xmlrpc_server: tuple[subprocess.Popen, int]): + """Test connecting to a real XML-RPC server.""" + _, port = xmlrpc_server + url = f"http://127.0.0.1:{port}" + + mw = XmlRpcMiddleware.connect(url) + + assert mw is not None + assert mw._url == url + + def test_connect_failure(self): + """Test connection failure to non-existent server.""" + # Use a port that's very unlikely to be in use + url = "http://127.0.0.1:59999" + + with pytest.raises(ConnectionRefusedError): + XmlRpcMiddleware.connect(url) + + def test_get_connection_info(self, xmlrpc_server: tuple[subprocess.Popen, int]): + """Test connection info with introspection enabled.""" + _, port = xmlrpc_server + mw = XmlRpcMiddleware.connect(f"http://127.0.0.1:{port}") + + info = mw.get_connection_info(xmlrpc_port=port) + + assert isinstance(info, ConnectionInfoModel) + assert info.xmlrpc_port == port + # Should have our methods (excluding system.*) + assert "get_frequency" in info.methods + assert "set_frequency" in info.methods + assert "start" in info.methods + + def test_list_variables_discovers_all( + self, xmlrpc_server: tuple[subprocess.Popen, int] + ): + """Test variable discovery finds all get_*/set_* pairs.""" + _, port = xmlrpc_server + mw = XmlRpcMiddleware.connect(f"http://127.0.0.1:{port}") + + variables = mw.list_variables() + + names = {v.name for v in variables} + # These have both get_ and set_ + assert "frequency" in names + assert "amplitude" in names + assert "gain" in names + assert "enabled" in names + # sample_rate has only get_, should be excluded + assert "sample_rate" not in names + + def test_list_variables_retrieves_values( + self, xmlrpc_server: tuple[subprocess.Popen, int] + ): + """Test that list_variables retrieves actual values.""" + _, port = xmlrpc_server + mw = XmlRpcMiddleware.connect(f"http://127.0.0.1:{port}") + + variables = mw.list_variables() + var_dict = {v.name: v.value for v in variables} + + assert var_dict["frequency"] == 101.1e6 + assert var_dict["amplitude"] == 0.5 + assert var_dict["gain"] == 10 + assert var_dict["enabled"] is True + + def test_get_variable_float(self, xmlrpc_server: tuple[subprocess.Popen, int]): + """Test reading a float variable.""" + _, port = xmlrpc_server + mw = XmlRpcMiddleware.connect(f"http://127.0.0.1:{port}") + + value = mw.get_variable("frequency") + + assert value == 101.1e6 + assert isinstance(value, float) + + def test_get_variable_int(self, xmlrpc_server: tuple[subprocess.Popen, int]): + """Test reading an integer variable.""" + _, port = xmlrpc_server + mw = XmlRpcMiddleware.connect(f"http://127.0.0.1:{port}") + + value = mw.get_variable("gain") + + assert value == 10 + assert isinstance(value, int) + + def test_get_variable_bool(self, xmlrpc_server: tuple[subprocess.Popen, int]): + """Test reading a boolean variable.""" + _, port = xmlrpc_server + mw = XmlRpcMiddleware.connect(f"http://127.0.0.1:{port}") + + value = mw.get_variable("enabled") + + assert value is True + assert isinstance(value, bool) + + def test_set_variable_float(self, xmlrpc_server: tuple[subprocess.Popen, int]): + """Test setting a float variable and reading it back.""" + _, port = xmlrpc_server + mw = XmlRpcMiddleware.connect(f"http://127.0.0.1:{port}") + + # Set new value + result = mw.set_variable("frequency", 107.2e6) + assert result is True + + # Verify it was set + value = mw.get_variable("frequency") + assert value == 107.2e6 + + def test_set_variable_int(self, xmlrpc_server: tuple[subprocess.Popen, int]): + """Test setting an integer variable.""" + _, port = xmlrpc_server + mw = XmlRpcMiddleware.connect(f"http://127.0.0.1:{port}") + + mw.set_variable("gain", 20) + value = mw.get_variable("gain") + + assert value == 20 + + def test_set_variable_bool(self, xmlrpc_server: tuple[subprocess.Popen, int]): + """Test setting a boolean variable.""" + _, port = xmlrpc_server + mw = XmlRpcMiddleware.connect(f"http://127.0.0.1:{port}") + + mw.set_variable("enabled", False) + value = mw.get_variable("enabled") + + assert value is False + + +class TestFlowgraphControlIntegration: + """Integration tests for flowgraph control commands.""" + + def test_start(self, xmlrpc_server: tuple[subprocess.Popen, int]): + """Test starting the flowgraph.""" + _, port = xmlrpc_server + mw = XmlRpcMiddleware.connect(f"http://127.0.0.1:{port}") + + result = mw.start() + + assert result is True + + def test_stop(self, xmlrpc_server: tuple[subprocess.Popen, int]): + """Test stopping the flowgraph.""" + _, port = xmlrpc_server + mw = XmlRpcMiddleware.connect(f"http://127.0.0.1:{port}") + + result = mw.stop() + + assert result is True + + def test_lock(self, xmlrpc_server: tuple[subprocess.Popen, int]): + """Test locking the flowgraph.""" + _, port = xmlrpc_server + mw = XmlRpcMiddleware.connect(f"http://127.0.0.1:{port}") + + result = mw.lock() + + assert result is True + + def test_unlock(self, xmlrpc_server: tuple[subprocess.Popen, int]): + """Test unlocking the flowgraph.""" + _, port = xmlrpc_server + mw = XmlRpcMiddleware.connect(f"http://127.0.0.1:{port}") + + result = mw.unlock() + + assert result is True + + def test_lock_unlock_sequence(self, xmlrpc_server: tuple[subprocess.Popen, int]): + """Test the lock/unlock sequence used for thread-safe updates.""" + _, port = xmlrpc_server + mw = XmlRpcMiddleware.connect(f"http://127.0.0.1:{port}") + + # Typical GNU Radio pattern: lock, update, unlock + assert mw.lock() is True + mw.set_variable("frequency", 98.5e6) + assert mw.unlock() is True + + # Verify the update took effect + assert mw.get_variable("frequency") == 98.5e6 + + +class TestConnectionLifecycle: + """Tests for connection management.""" + + def test_close_clears_proxy(self, xmlrpc_server: tuple[subprocess.Popen, int]): + """Test that close() clears the proxy reference.""" + _, port = xmlrpc_server + mw = XmlRpcMiddleware.connect(f"http://127.0.0.1:{port}") + + mw.close() + + assert mw._proxy is None + + def test_reconnect_after_close(self, xmlrpc_server: tuple[subprocess.Popen, int]): + """Test reconnecting after closing.""" + _, port = xmlrpc_server + url = f"http://127.0.0.1:{port}" + + # First connection + mw1 = XmlRpcMiddleware.connect(url) + mw1.set_variable("gain", 15) + mw1.close() + + # Reconnect + mw2 = XmlRpcMiddleware.connect(url) + value = mw2.get_variable("gain") + + # Value should persist (server is still running) + assert value == 15