diff --git a/mcp/skywalker-mcp/src/skywalker_mcp/mock_device.py b/mcp/skywalker-mcp/src/skywalker_mcp/mock_device.py new file mode 100644 index 0000000..2054297 --- /dev/null +++ b/mcp/skywalker-mcp/src/skywalker_mcp/mock_device.py @@ -0,0 +1,136 @@ +"""Mock SkyWalker1 device for testing without USB hardware. + +Used by: + - Server lifespan when SKYWALKER_MOCK=1 is set (integration testing) + - conftest.py fixtures for unit tests +""" + + +class MockSkyWalker1: + """Returns plausible data for all SkyWalker1 API calls without USB hardware.""" + + def __init__(self, verbose=False): + self.verbose = verbose + self._motor_halted = False + self._armed = False + self._lnb_on = True + self._calls = [] + + def _record(self, method, *args, **kwargs): + self._calls.append((method, args, kwargs)) + + def open(self): + self._record("open") + + def close(self): + self._record("close") + + def ensure_booted(self): + self._record("ensure_booted") + + def get_fw_version(self): + self._record("get_fw_version") + return {"version": "3.05.0-mock", "date": "2026-02-17", "raw": b"\x03\x05\x00"} + + def get_config(self): + self._record("get_config") + return 0x3F + + def get_usb_speed(self): + self._record("get_usb_speed") + return 2 + + def get_serial_number(self): + self._record("get_serial_number") + return bytes([0xDE, 0xAD, 0xBE, 0xEF]) + + def get_last_error(self): + self._record("get_last_error") + return 0x00 + + def signal_monitor(self): + self._record("signal_monitor") + return { + "snr_raw": 200, "snr_db": 8.5, "snr_pct": 42.5, + "agc1": 1200, "agc2": 800, "power_db": -45.3, + "locked": True, "lock": 0x1F, "status": 0x01, + } + + def get_stream_diag(self, reset=False): + self._record("get_stream_diag", reset=reset) + return {"poll_count": 100, "overflow_count": 0, "sync_loss": 0, "armed": self._armed} + + def sweep_spectrum(self, start_mhz, stop_mhz, step_mhz=5.0, dwell_ms=15): + self._record("sweep_spectrum", start_mhz, stop_mhz, step_mhz=step_mhz, dwell_ms=dwell_ms) + n_points = int((stop_mhz - start_mhz) / step_mhz) + 1 + freqs = [start_mhz + i * step_mhz for i in range(n_points)] + powers = [] + for f in freqs: + base = -50.0 + if abs(f - 1420.0) < 5: + base += 8.0 * (1.0 - abs(f - 1420.0) / 5.0) + powers.append(base) + raw = [(int((p + 60) * 100), 0) for p in powers] + return freqs, powers, raw + + def tune_monitor(self, sr_sps, freq_khz, mod_idx, fec_idx, dwell_ms): + self._record("tune_monitor", sr_sps, freq_khz, mod_idx, fec_idx, dwell_ms) + return { + "snr_raw": 180, "snr_db": 7.8, "snr_pct": 39.0, + "agc1": 1100, "agc2": 750, "power_db": -46.1, + "locked": True, "lock": 0x1F, "status": 0x01, + "dwell_ms": dwell_ms, + } + + def adaptive_blind_scan(self, freq_khz, sr_min, sr_max, sr_step): + self._record("adaptive_blind_scan", freq_khz, sr_min, sr_max, sr_step) + return {"freq_khz": freq_khz, "locked": True, "sr_sps": 20000000} + + def motor_halt(self): + self._record("motor_halt") + self._motor_halted = True + + def motor_drive_east(self, steps): + self._record("motor_drive_east", steps) + self._motor_halted = False + + def motor_drive_west(self, steps): + self._record("motor_drive_west", steps) + self._motor_halted = False + + def motor_goto_position(self, slot): + self._record("motor_goto_position", slot) + + def motor_goto_x(self, observer_lon, sat_lon): + self._record("motor_goto_x", observer_lon, sat_lon) + + def motor_store_position(self, slot): + self._record("motor_store_position", slot) + + def start_intersil(self, on=True): + self._record("start_intersil", on=on) + self._lnb_on = on + + def set_lnb_voltage(self, high): + self._record("set_lnb_voltage", high) + + def set_22khz_tone(self, on): + self._record("set_22khz_tone", on) + + def i2c_bus_scan(self): + self._record("i2c_bus_scan") + return [0x08, 0x61, 0x51] + + def i2c_raw_read(self, slave, register): + self._record("i2c_raw_read", slave, register) + return 0xAB + + def arm_transfer(self, on): + self._record("arm_transfer", on) + self._armed = on + + def read_stream(self, timeout=500): + self._record("read_stream", timeout=timeout) + if self._armed: + return bytes([0x47, 0x00, 0x00, 0x10] + [0xFF] * 184) + return None diff --git a/mcp/skywalker-mcp/src/skywalker_mcp/server.py b/mcp/skywalker-mcp/src/skywalker_mcp/server.py index 930c652..53e4a33 100644 --- a/mcp/skywalker-mcp/src/skywalker_mcp/server.py +++ b/mcp/skywalker-mcp/src/skywalker_mcp/server.py @@ -6,6 +6,7 @@ function accessible to LLMs. Thread-safe concurrent access via asyncio.to_thread and a reentrant lock, following the same USBBridge pattern used by the TUI. """ +import os import sys import asyncio import threading @@ -100,9 +101,18 @@ _bridge: DeviceBridge | None = None @asynccontextmanager async def lifespan(server: FastMCP): - """Open the USB device on startup, close on shutdown.""" + """Open the USB device on startup, close on shutdown. + + Set SKYWALKER_MOCK=1 to use a mock device for integration testing + without USB hardware. + """ global _bridge - dev = SkyWalker1(verbose=False) + if os.environ.get("SKYWALKER_MOCK"): + from skywalker_mcp.mock_device import MockSkyWalker1 + dev = MockSkyWalker1(verbose=False) + print("skywalker-mcp: MOCK MODE — no USB hardware", file=sys.stderr) + else: + dev = SkyWalker1(verbose=False) try: dev.open() dev.ensure_booted() diff --git a/mcp/skywalker-mcp/tests/conftest.py b/mcp/skywalker-mcp/tests/conftest.py index dd6ba73..6010bb2 100644 --- a/mcp/skywalker-mcp/tests/conftest.py +++ b/mcp/skywalker-mcp/tests/conftest.py @@ -5,139 +5,10 @@ from unittest.mock import MagicMock import pytest import skywalker_mcp.server as srv +from skywalker_mcp.mock_device import MockSkyWalker1 from skywalker_mcp.server import DeviceBridge -class MockSkyWalker1: - """Mock SkyWalker1 device that returns plausible data without USB hardware.""" - - def __init__(self, verbose=False): - self.verbose = verbose - self._motor_halted = False - self._armed = False - self._lnb_on = True - self._calls = [] - - def _record(self, method, *args, **kwargs): - self._calls.append((method, args, kwargs)) - - def open(self): - self._record("open") - - def close(self): - self._record("close") - - def ensure_booted(self): - self._record("ensure_booted") - - def get_fw_version(self): - self._record("get_fw_version") - return {"version": "3.05.0-test", "date": "2026-02-17", "raw": b"\x03\x05\x00"} - - def get_config(self): - self._record("get_config") - return 0x3F - - def get_usb_speed(self): - self._record("get_usb_speed") - return 2 - - def get_serial_number(self): - self._record("get_serial_number") - return bytes([0xDE, 0xAD, 0xBE, 0xEF]) - - def get_last_error(self): - self._record("get_last_error") - return 0x00 - - def signal_monitor(self): - self._record("signal_monitor") - return { - "snr_raw": 200, "snr_db": 8.5, "snr_pct": 42.5, - "agc1": 1200, "agc2": 800, "power_db": -45.3, - "locked": True, "lock": 0x1F, "status": 0x01, - } - - def get_stream_diag(self, reset=False): - self._record("get_stream_diag", reset=reset) - return {"poll_count": 100, "overflow_count": 0, "sync_loss": 0, "armed": self._armed} - - def sweep_spectrum(self, start_mhz, stop_mhz, step_mhz=5.0, dwell_ms=15): - self._record("sweep_spectrum", start_mhz, stop_mhz, step_mhz=step_mhz, dwell_ms=dwell_ms) - n_points = int((stop_mhz - start_mhz) / step_mhz) + 1 - freqs = [start_mhz + i * step_mhz for i in range(n_points)] - powers = [] - for f in freqs: - base = -50.0 - if abs(f - 1420.0) < 5: - base += 8.0 * (1.0 - abs(f - 1420.0) / 5.0) - powers.append(base) - raw = [(int((p + 60) * 100), 0) for p in powers] - return freqs, powers, raw - - def tune_monitor(self, sr_sps, freq_khz, mod_idx, fec_idx, dwell_ms): - self._record("tune_monitor", sr_sps, freq_khz, mod_idx, fec_idx, dwell_ms) - return { - "snr_raw": 180, "snr_db": 7.8, "snr_pct": 39.0, - "agc1": 1100, "agc2": 750, "power_db": -46.1, - "locked": True, "lock": 0x1F, "status": 0x01, - "dwell_ms": dwell_ms, - } - - def adaptive_blind_scan(self, freq_khz, sr_min, sr_max, sr_step): - self._record("adaptive_blind_scan", freq_khz, sr_min, sr_max, sr_step) - return {"freq_khz": freq_khz, "locked": True, "sr_sps": 20000000} - - def motor_halt(self): - self._record("motor_halt") - self._motor_halted = True - - def motor_drive_east(self, steps): - self._record("motor_drive_east", steps) - self._motor_halted = False - - def motor_drive_west(self, steps): - self._record("motor_drive_west", steps) - self._motor_halted = False - - def motor_goto_position(self, slot): - self._record("motor_goto_position", slot) - - def motor_goto_x(self, observer_lon, sat_lon): - self._record("motor_goto_x", observer_lon, sat_lon) - - def motor_store_position(self, slot): - self._record("motor_store_position", slot) - - def start_intersil(self, on=True): - self._record("start_intersil", on=on) - self._lnb_on = on - - def set_lnb_voltage(self, high): - self._record("set_lnb_voltage", high) - - def set_22khz_tone(self, on): - self._record("set_22khz_tone", on) - - def i2c_bus_scan(self): - self._record("i2c_bus_scan") - return [0x08, 0x61, 0x51] - - def i2c_raw_read(self, slave, register): - self._record("i2c_raw_read", slave, register) - return 0xAB - - def arm_transfer(self, on): - self._record("arm_transfer", on) - self._armed = on - - def read_stream(self, timeout=500): - self._record("read_stream", timeout=timeout) - if self._armed: - return bytes([0x47, 0x00, 0x00, 0x10] + [0xFF] * 184) - return None - - class MockContext: """Minimal mock of FastMCP Context for direct tool function calls. diff --git a/mcp/skywalker-mcp/tests/test_server.py b/mcp/skywalker-mcp/tests/test_server.py index 083bac6..69e5ed7 100644 --- a/mcp/skywalker-mcp/tests/test_server.py +++ b/mcp/skywalker-mcp/tests/test_server.py @@ -87,7 +87,7 @@ def test_prompt_count(): async def test_get_device_status(ctx): result = await get_device_status(ctx) - assert "3.05.0-test" in result["firmware"]["version"] + assert "3.05.0" in result["firmware"]["version"] assert result["usb_speed"] == "High (480 Mbps)" assert "de ad be ef" in result["serial"]