Add SKYWALKER_MOCK=1 mode for hardware-free MCP integration testing

Extract MockSkyWalker1 to shared mock_device.py (used by both unit tests
and server lifespan). Server checks SKYWALKER_MOCK env var at startup —
when set, uses mock device instead of USB hardware, enabling full MCP
transport testing via claude -p without the dongle connected.

Verified: 64 unit tests pass, claude -p integration tests exercise
identify_frequency, get_device_status, and sweep_spectrum through the
complete JSON-RPC pipeline.
This commit is contained in:
Ryan Malloy 2026-02-17 15:23:05 -07:00
parent 4a63dbbb9d
commit cbfbbf6bfe
4 changed files with 150 additions and 133 deletions

View File

@ -0,0 +1,136 @@
"""Mock SkyWalker1 device for testing without USB hardware.
Used by:
- Server lifespan when SKYWALKER_MOCK=1 is set (integration testing)
- conftest.py fixtures for unit tests
"""
class MockSkyWalker1:
"""Returns plausible data for all SkyWalker1 API calls without USB hardware."""
def __init__(self, verbose=False):
self.verbose = verbose
self._motor_halted = False
self._armed = False
self._lnb_on = True
self._calls = []
def _record(self, method, *args, **kwargs):
self._calls.append((method, args, kwargs))
def open(self):
self._record("open")
def close(self):
self._record("close")
def ensure_booted(self):
self._record("ensure_booted")
def get_fw_version(self):
self._record("get_fw_version")
return {"version": "3.05.0-mock", "date": "2026-02-17", "raw": b"\x03\x05\x00"}
def get_config(self):
self._record("get_config")
return 0x3F
def get_usb_speed(self):
self._record("get_usb_speed")
return 2
def get_serial_number(self):
self._record("get_serial_number")
return bytes([0xDE, 0xAD, 0xBE, 0xEF])
def get_last_error(self):
self._record("get_last_error")
return 0x00
def signal_monitor(self):
self._record("signal_monitor")
return {
"snr_raw": 200, "snr_db": 8.5, "snr_pct": 42.5,
"agc1": 1200, "agc2": 800, "power_db": -45.3,
"locked": True, "lock": 0x1F, "status": 0x01,
}
def get_stream_diag(self, reset=False):
self._record("get_stream_diag", reset=reset)
return {"poll_count": 100, "overflow_count": 0, "sync_loss": 0, "armed": self._armed}
def sweep_spectrum(self, start_mhz, stop_mhz, step_mhz=5.0, dwell_ms=15):
self._record("sweep_spectrum", start_mhz, stop_mhz, step_mhz=step_mhz, dwell_ms=dwell_ms)
n_points = int((stop_mhz - start_mhz) / step_mhz) + 1
freqs = [start_mhz + i * step_mhz for i in range(n_points)]
powers = []
for f in freqs:
base = -50.0
if abs(f - 1420.0) < 5:
base += 8.0 * (1.0 - abs(f - 1420.0) / 5.0)
powers.append(base)
raw = [(int((p + 60) * 100), 0) for p in powers]
return freqs, powers, raw
def tune_monitor(self, sr_sps, freq_khz, mod_idx, fec_idx, dwell_ms):
self._record("tune_monitor", sr_sps, freq_khz, mod_idx, fec_idx, dwell_ms)
return {
"snr_raw": 180, "snr_db": 7.8, "snr_pct": 39.0,
"agc1": 1100, "agc2": 750, "power_db": -46.1,
"locked": True, "lock": 0x1F, "status": 0x01,
"dwell_ms": dwell_ms,
}
def adaptive_blind_scan(self, freq_khz, sr_min, sr_max, sr_step):
self._record("adaptive_blind_scan", freq_khz, sr_min, sr_max, sr_step)
return {"freq_khz": freq_khz, "locked": True, "sr_sps": 20000000}
def motor_halt(self):
self._record("motor_halt")
self._motor_halted = True
def motor_drive_east(self, steps):
self._record("motor_drive_east", steps)
self._motor_halted = False
def motor_drive_west(self, steps):
self._record("motor_drive_west", steps)
self._motor_halted = False
def motor_goto_position(self, slot):
self._record("motor_goto_position", slot)
def motor_goto_x(self, observer_lon, sat_lon):
self._record("motor_goto_x", observer_lon, sat_lon)
def motor_store_position(self, slot):
self._record("motor_store_position", slot)
def start_intersil(self, on=True):
self._record("start_intersil", on=on)
self._lnb_on = on
def set_lnb_voltage(self, high):
self._record("set_lnb_voltage", high)
def set_22khz_tone(self, on):
self._record("set_22khz_tone", on)
def i2c_bus_scan(self):
self._record("i2c_bus_scan")
return [0x08, 0x61, 0x51]
def i2c_raw_read(self, slave, register):
self._record("i2c_raw_read", slave, register)
return 0xAB
def arm_transfer(self, on):
self._record("arm_transfer", on)
self._armed = on
def read_stream(self, timeout=500):
self._record("read_stream", timeout=timeout)
if self._armed:
return bytes([0x47, 0x00, 0x00, 0x10] + [0xFF] * 184)
return None

View File

@ -6,6 +6,7 @@ function accessible to LLMs. Thread-safe concurrent access via asyncio.to_thread
and a reentrant lock, following the same USBBridge pattern used by the TUI. and a reentrant lock, following the same USBBridge pattern used by the TUI.
""" """
import os
import sys import sys
import asyncio import asyncio
import threading import threading
@ -100,8 +101,17 @@ _bridge: DeviceBridge | None = None
@asynccontextmanager @asynccontextmanager
async def lifespan(server: FastMCP): async def lifespan(server: FastMCP):
"""Open the USB device on startup, close on shutdown.""" """Open the USB device on startup, close on shutdown.
Set SKYWALKER_MOCK=1 to use a mock device for integration testing
without USB hardware.
"""
global _bridge global _bridge
if os.environ.get("SKYWALKER_MOCK"):
from skywalker_mcp.mock_device import MockSkyWalker1
dev = MockSkyWalker1(verbose=False)
print("skywalker-mcp: MOCK MODE — no USB hardware", file=sys.stderr)
else:
dev = SkyWalker1(verbose=False) dev = SkyWalker1(verbose=False)
try: try:
dev.open() dev.open()

View File

@ -5,139 +5,10 @@ from unittest.mock import MagicMock
import pytest import pytest
import skywalker_mcp.server as srv import skywalker_mcp.server as srv
from skywalker_mcp.mock_device import MockSkyWalker1
from skywalker_mcp.server import DeviceBridge from skywalker_mcp.server import DeviceBridge
class MockSkyWalker1:
"""Mock SkyWalker1 device that returns plausible data without USB hardware."""
def __init__(self, verbose=False):
self.verbose = verbose
self._motor_halted = False
self._armed = False
self._lnb_on = True
self._calls = []
def _record(self, method, *args, **kwargs):
self._calls.append((method, args, kwargs))
def open(self):
self._record("open")
def close(self):
self._record("close")
def ensure_booted(self):
self._record("ensure_booted")
def get_fw_version(self):
self._record("get_fw_version")
return {"version": "3.05.0-test", "date": "2026-02-17", "raw": b"\x03\x05\x00"}
def get_config(self):
self._record("get_config")
return 0x3F
def get_usb_speed(self):
self._record("get_usb_speed")
return 2
def get_serial_number(self):
self._record("get_serial_number")
return bytes([0xDE, 0xAD, 0xBE, 0xEF])
def get_last_error(self):
self._record("get_last_error")
return 0x00
def signal_monitor(self):
self._record("signal_monitor")
return {
"snr_raw": 200, "snr_db": 8.5, "snr_pct": 42.5,
"agc1": 1200, "agc2": 800, "power_db": -45.3,
"locked": True, "lock": 0x1F, "status": 0x01,
}
def get_stream_diag(self, reset=False):
self._record("get_stream_diag", reset=reset)
return {"poll_count": 100, "overflow_count": 0, "sync_loss": 0, "armed": self._armed}
def sweep_spectrum(self, start_mhz, stop_mhz, step_mhz=5.0, dwell_ms=15):
self._record("sweep_spectrum", start_mhz, stop_mhz, step_mhz=step_mhz, dwell_ms=dwell_ms)
n_points = int((stop_mhz - start_mhz) / step_mhz) + 1
freqs = [start_mhz + i * step_mhz for i in range(n_points)]
powers = []
for f in freqs:
base = -50.0
if abs(f - 1420.0) < 5:
base += 8.0 * (1.0 - abs(f - 1420.0) / 5.0)
powers.append(base)
raw = [(int((p + 60) * 100), 0) for p in powers]
return freqs, powers, raw
def tune_monitor(self, sr_sps, freq_khz, mod_idx, fec_idx, dwell_ms):
self._record("tune_monitor", sr_sps, freq_khz, mod_idx, fec_idx, dwell_ms)
return {
"snr_raw": 180, "snr_db": 7.8, "snr_pct": 39.0,
"agc1": 1100, "agc2": 750, "power_db": -46.1,
"locked": True, "lock": 0x1F, "status": 0x01,
"dwell_ms": dwell_ms,
}
def adaptive_blind_scan(self, freq_khz, sr_min, sr_max, sr_step):
self._record("adaptive_blind_scan", freq_khz, sr_min, sr_max, sr_step)
return {"freq_khz": freq_khz, "locked": True, "sr_sps": 20000000}
def motor_halt(self):
self._record("motor_halt")
self._motor_halted = True
def motor_drive_east(self, steps):
self._record("motor_drive_east", steps)
self._motor_halted = False
def motor_drive_west(self, steps):
self._record("motor_drive_west", steps)
self._motor_halted = False
def motor_goto_position(self, slot):
self._record("motor_goto_position", slot)
def motor_goto_x(self, observer_lon, sat_lon):
self._record("motor_goto_x", observer_lon, sat_lon)
def motor_store_position(self, slot):
self._record("motor_store_position", slot)
def start_intersil(self, on=True):
self._record("start_intersil", on=on)
self._lnb_on = on
def set_lnb_voltage(self, high):
self._record("set_lnb_voltage", high)
def set_22khz_tone(self, on):
self._record("set_22khz_tone", on)
def i2c_bus_scan(self):
self._record("i2c_bus_scan")
return [0x08, 0x61, 0x51]
def i2c_raw_read(self, slave, register):
self._record("i2c_raw_read", slave, register)
return 0xAB
def arm_transfer(self, on):
self._record("arm_transfer", on)
self._armed = on
def read_stream(self, timeout=500):
self._record("read_stream", timeout=timeout)
if self._armed:
return bytes([0x47, 0x00, 0x00, 0x10] + [0xFF] * 184)
return None
class MockContext: class MockContext:
"""Minimal mock of FastMCP Context for direct tool function calls. """Minimal mock of FastMCP Context for direct tool function calls.

View File

@ -87,7 +87,7 @@ def test_prompt_count():
async def test_get_device_status(ctx): async def test_get_device_status(ctx):
result = await get_device_status(ctx) result = await get_device_status(ctx)
assert "3.05.0-test" in result["firmware"]["version"] assert "3.05.0" in result["firmware"]["version"]
assert result["usb_speed"] == "High (480 Mbps)" assert result["usb_speed"] == "High (480 Mbps)"
assert "de ad be ef" in result["serial"] assert "de ad be ef" in result["serial"]