runtime: Phase 1 Docker + XML-RPC control

Add RuntimeProvider with 17 MCP tools for controlling running flowgraphs:
- Container lifecycle: launch, list, stop, remove
- Connection: connect by URL or container name
- Variable control: list, get, set via XML-RPC introspection
- Flowgraph execution: start, stop, lock, unlock
- Visual feedback: screenshot capture, container logs

Docker is optional - 10 tools work without it for external flowgraphs.

Includes:
- DockerMiddleware wrapping docker.DockerClient
- XmlRpcMiddleware wrapping xmlrpc.client.ServerProxy
- Dockerfile with Xvfb + ImageMagick + VNC for headless QT
- 29 new unit tests (71 total)
This commit is contained in:
Ryan Malloy 2026-01-27 09:48:44 -07:00
parent 2bef80a47a
commit 2084c41228
12 changed files with 1030 additions and 1 deletions

View File

@ -0,0 +1,26 @@
FROM librespace/gnuradio:latest
# Xvfb for headless QT rendering, ImageMagick for screenshots,
# Mesa for software OpenGL, x11vnc for optional visual debugging
RUN apt-get update && apt-get install -y --no-install-recommends \
xvfb \
x11vnc \
imagemagick \
mesa-utils \
libgl1-mesa-dri \
fonts-dejavu-core \
x11-utils \
&& rm -rf /var/lib/apt/lists/*
COPY entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh
WORKDIR /flowgraphs
ENV DISPLAY=:99
ENV XMLRPC_PORT=8080
EXPOSE 8080
EXPOSE 5900
ENTRYPOINT ["/entrypoint.sh"]

21
docker/entrypoint.sh Executable file
View File

@ -0,0 +1,21 @@
#!/bin/bash
set -e
# Start Xvfb for headless QT rendering
Xvfb :99 -screen 0 1280x720x24 -ac +extension GLX +render -noreset &
XVFB_PID=$!
# Wait for Xvfb to be ready
while ! xdpyinfo -display :99 >/dev/null 2>&1; do
sleep 0.1
done
echo "Xvfb ready on :99"
# Optional VNC server for visual debugging
if [ "${ENABLE_VNC:-0}" = "1" ]; then
x11vnc -display :99 -forever -nopw -shared -rfbport 5900 &
echo "VNC server on :5900"
fi
# Run the flowgraph (passed as CMD arguments)
exec "$@"

View File

@ -4,6 +4,7 @@ from fastmcp import FastMCP
from gnuradio_mcp.middlewares.platform import PlatformMiddleware
from gnuradio_mcp.providers.mcp import McpPlatformProvider
from gnuradio_mcp.providers.mcp_runtime import McpRuntimeProvider
try:
from gnuradio import gr
@ -21,6 +22,7 @@ platform.build_library()
app: FastMCP = FastMCP("GNU Radio MCP", instructions="Create GNU Radio flowgraphs")
McpPlatformProvider.from_platform_middleware(app, PlatformMiddleware(platform))
McpRuntimeProvider.create(app)
if __name__ == "__main__":
app.run()

View File

@ -16,6 +16,9 @@ dependencies = [
]
[project.optional-dependencies]
runtime = [
"docker>=7.0",
]
dev = [
"pytest>=9.0",
"pytest-asyncio>=1.3",

View File

@ -0,0 +1,165 @@
from __future__ import annotations
import base64
import logging
from pathlib import Path
from typing import Any
from gnuradio_mcp.models import ContainerModel, ScreenshotModel
logger = logging.getLogger(__name__)
DEFAULT_XMLRPC_PORT = 8080
DEFAULT_VNC_PORT = 5900
RUNTIME_IMAGE = "gnuradio-runtime:latest"
CONTAINER_FLOWGRAPH_DIR = "/flowgraphs"
class DockerMiddleware:
"""Wraps the Docker SDK to manage GNU Radio runtime containers.
Each container runs a flowgraph with Xvfb for headless QT rendering.
XML-RPC is exposed for variable control; VNC is optional for visual debugging.
"""
def __init__(self, docker_client: Any):
self._client = docker_client
@classmethod
def create(cls) -> DockerMiddleware | None:
"""Attempt to create a DockerMiddleware. Returns None if Docker is unavailable."""
try:
import docker
client = docker.from_env()
client.ping()
return cls(client)
except Exception as e:
logger.warning("Docker unavailable: %s", e)
return None
def launch(
self,
flowgraph_path: str,
name: str,
xmlrpc_port: int = DEFAULT_XMLRPC_PORT,
enable_vnc: bool = False,
device_paths: list[str] | None = None,
) -> ContainerModel:
"""Launch a flowgraph in a Docker container with Xvfb."""
fg_path = Path(flowgraph_path).resolve()
if not fg_path.exists():
raise FileNotFoundError(f"Flowgraph not found: {fg_path}")
env = {"DISPLAY": ":99", "XMLRPC_PORT": str(xmlrpc_port)}
if enable_vnc:
env["ENABLE_VNC"] = "1"
ports: dict[str, int] = {f"{xmlrpc_port}/tcp": xmlrpc_port}
vnc_port: int | None = None
if enable_vnc:
vnc_port = DEFAULT_VNC_PORT
ports[f"{vnc_port}/tcp"] = vnc_port
volumes = {
str(fg_path.parent): {
"bind": CONTAINER_FLOWGRAPH_DIR,
"mode": "ro",
}
}
devices = [f"{d}:{d}:rwm" for d in (device_paths or [])]
container_fg_path = f"{CONTAINER_FLOWGRAPH_DIR}/{fg_path.name}"
container = self._client.containers.run(
RUNTIME_IMAGE,
command=["python3", container_fg_path],
name=name,
detach=True,
environment=env,
ports=ports,
volumes=volumes,
devices=devices or None,
labels={
"gr-mcp": "true",
"gr-mcp.flowgraph": str(fg_path),
"gr-mcp.xmlrpc-port": str(xmlrpc_port),
"gr-mcp.vnc-enabled": "1" if enable_vnc else "0",
},
)
return ContainerModel(
name=name,
container_id=container.id[:12],
status="running",
flowgraph_path=str(fg_path),
xmlrpc_port=xmlrpc_port,
vnc_port=vnc_port,
device_paths=device_paths or [],
)
def list_containers(self) -> list[ContainerModel]:
"""List all gr-mcp managed containers."""
containers = self._client.containers.list(
all=True, filters={"label": "gr-mcp=true"}
)
result = []
for c in containers:
labels = c.labels
result.append(
ContainerModel(
name=c.name,
container_id=c.id[:12],
status=c.status,
flowgraph_path=labels.get("gr-mcp.flowgraph", ""),
xmlrpc_port=int(labels.get("gr-mcp.xmlrpc-port", DEFAULT_XMLRPC_PORT)),
vnc_port=DEFAULT_VNC_PORT
if labels.get("gr-mcp.vnc-enabled") == "1" and c.status == "running"
else None,
)
)
return result
def stop(self, name: str) -> bool:
"""Stop a container by name."""
container = self._client.containers.get(name)
container.stop(timeout=10)
return True
def remove(self, name: str, force: bool = False) -> bool:
"""Remove a container by name."""
container = self._client.containers.get(name)
container.remove(force=force)
return True
def get_logs(self, name: str, tail: int = 100) -> str:
"""Get container logs."""
container = self._client.containers.get(name)
return container.logs(tail=tail).decode("utf-8", errors="replace")
def capture_screenshot(self, name: str) -> ScreenshotModel:
"""Capture the Xvfb framebuffer via ImageMagick import."""
container = self._client.containers.get(name)
exit_code, output = container.exec_run(
["import", "-display", ":99", "-window", "root", "png:-"],
)
if exit_code != 0:
raise RuntimeError(
f"Screenshot failed (exit {exit_code}): "
f"{output.decode('utf-8', errors='replace')[:200]}"
)
image_b64 = base64.b64encode(output).decode("ascii")
return ScreenshotModel(
container_name=name,
image_base64=image_b64,
format="png",
)
def get_xmlrpc_port(self, name: str) -> int:
"""Get the XML-RPC port for a container."""
container = self._client.containers.get(name)
return int(
container.labels.get("gr-mcp.xmlrpc-port", DEFAULT_XMLRPC_PORT)
)

View File

@ -0,0 +1,109 @@
from __future__ import annotations
import logging
import xmlrpc.client
from typing import Any
from gnuradio_mcp.models import ConnectionInfoModel, VariableModel
logger = logging.getLogger(__name__)
XMLRPC_TIMEOUT = 5
class XmlRpcMiddleware:
"""Wraps xmlrpc.client.ServerProxy for GNU Radio XML-RPC control.
GNU Radio flowgraphs expose an XML-RPC server when they contain an
xmlrpc_server block. Methods follow the pattern:
- get_<variable>() to read
- set_<variable>(value) to write
- start() / stop() / lock() / unlock() for execution control
"""
def __init__(self, proxy: xmlrpc.client.ServerProxy, url: str):
self._proxy = proxy
self._url = url
@classmethod
def connect(cls, url: str) -> XmlRpcMiddleware:
"""Create a connection to a GNU Radio XML-RPC server."""
transport = xmlrpc.client.Transport()
transport.timeout = XMLRPC_TIMEOUT
proxy = xmlrpc.client.ServerProxy(url, transport=transport)
# Verify connectivity
proxy.system.listMethods()
logger.info("Connected to XML-RPC at %s", url)
return cls(proxy, url)
def get_connection_info(
self, container_name: str | None = None, xmlrpc_port: int = 8080
) -> ConnectionInfoModel:
"""Return connection metadata including available methods."""
methods = self._list_methods()
return ConnectionInfoModel(
url=self._url,
container_name=container_name,
xmlrpc_port=xmlrpc_port,
methods=methods,
)
def _list_methods(self) -> list[str]:
"""List XML-RPC methods, filtering out system internals."""
try:
all_methods = self._proxy.system.listMethods()
return [m for m in all_methods if not m.startswith("system.")]
except Exception:
return []
def list_variables(self) -> list[VariableModel]:
"""Discover variables by introspecting get_* methods."""
methods = self._list_methods()
variables = []
for method in methods:
if method.startswith("get_"):
var_name = method[4:]
# Only include if there's a matching setter
if f"set_{var_name}" in methods:
try:
value = getattr(self._proxy, method)()
variables.append(VariableModel(name=var_name, value=value))
except Exception as e:
logger.warning("Failed to read %s: %s", var_name, e)
variables.append(VariableModel(name=var_name, value=None))
return variables
def get_variable(self, name: str) -> Any:
"""Get a variable value via XML-RPC."""
getter = getattr(self._proxy, f"get_{name}")
return getter()
def set_variable(self, name: str, value: Any) -> bool:
"""Set a variable value via XML-RPC."""
setter = getattr(self._proxy, f"set_{name}")
setter(value)
return True
def start(self) -> bool:
"""Start the flowgraph."""
self._proxy.start()
return True
def stop(self) -> bool:
"""Stop the flowgraph."""
self._proxy.stop()
return True
def lock(self) -> bool:
"""Lock the flowgraph for thread-safe parameter updates."""
self._proxy.lock()
return True
def unlock(self) -> bool:
"""Unlock the flowgraph after parameter updates."""
self._proxy.unlock()
return True
def close(self) -> None:
"""Close the XML-RPC connection (clears reference, GC handles socket)."""
self._proxy = None

View File

@ -114,3 +114,44 @@ class ErrorModel(BaseModel):
@classmethod
def transform_key(cls, v: KeyedModel) -> str:
return v.to_key()
# ──────────────────────────────────────────────
# Runtime Models (Phase 1: Docker + XML-RPC)
# ──────────────────────────────────────────────
class ContainerModel(BaseModel):
name: str
container_id: str
status: str
flowgraph_path: str
xmlrpc_port: int
vnc_port: int | None = None
device_paths: list[str] = []
class VariableModel(BaseModel):
name: str
value: Any
class ConnectionInfoModel(BaseModel):
url: str
container_name: str | None = None
xmlrpc_port: int
methods: list[str] = []
class ScreenshotModel(BaseModel):
container_name: str
image_base64: str
format: str = "png"
width: int | None = None
height: int | None = None
class RuntimeStatusModel(BaseModel):
connected: bool
connection: ConnectionInfoModel | None = None
containers: list[ContainerModel] = []

View File

@ -0,0 +1,66 @@
from __future__ import annotations
import logging
from fastmcp import FastMCP
from gnuradio_mcp.middlewares.docker import DockerMiddleware
from gnuradio_mcp.providers.runtime import RuntimeProvider
logger = logging.getLogger(__name__)
class McpRuntimeProvider:
"""Registers runtime control tools with FastMCP.
Docker is optional: if unavailable, container lifecycle and visual
feedback tools are skipped, but XML-RPC connection/control tools
are still registered (for connecting to externally-managed flowgraphs).
"""
def __init__(self, mcp_instance: FastMCP, runtime_provider: RuntimeProvider):
self._mcp = mcp_instance
self._provider = runtime_provider
self.__init_tools()
def __init_tools(self):
p = self._provider
# Connection management (always available)
self._mcp.tool(p.connect)
self._mcp.tool(p.disconnect)
self._mcp.tool(p.get_status)
# Variable control (always available)
self._mcp.tool(p.list_variables)
self._mcp.tool(p.get_variable)
self._mcp.tool(p.set_variable)
# Flowgraph execution (always available)
self._mcp.tool(p.start)
self._mcp.tool(p.stop)
self._mcp.tool(p.lock)
self._mcp.tool(p.unlock)
# Docker-dependent tools
if p._has_docker:
self._mcp.tool(p.launch_flowgraph)
self._mcp.tool(p.list_containers)
self._mcp.tool(p.stop_flowgraph)
self._mcp.tool(p.remove_flowgraph)
self._mcp.tool(p.connect_to_container)
self._mcp.tool(p.capture_screenshot)
self._mcp.tool(p.get_container_logs)
logger.info("Registered 17 runtime tools (Docker available)")
else:
logger.info(
"Registered 10 runtime tools (Docker unavailable, "
"container tools skipped)"
)
@classmethod
def create(cls, mcp_instance: FastMCP) -> McpRuntimeProvider:
"""Factory: create RuntimeProvider with optional Docker support."""
docker_mw = DockerMiddleware.create()
provider = RuntimeProvider(docker_mw=docker_mw)
return cls(mcp_instance, provider)

View File

@ -0,0 +1,216 @@
from __future__ import annotations
import logging
from typing import Any
from gnuradio_mcp.middlewares.docker import DockerMiddleware
from gnuradio_mcp.middlewares.xmlrpc import XmlRpcMiddleware
from gnuradio_mcp.models import (
ConnectionInfoModel,
ContainerModel,
RuntimeStatusModel,
ScreenshotModel,
VariableModel,
)
logger = logging.getLogger(__name__)
class RuntimeProvider:
"""Business logic for runtime flowgraph control.
Coordinates Docker (container lifecycle) and XML-RPC (variable control).
Tracks the active connection so convenience methods like get_variable()
work without repeating the URL each call.
"""
def __init__(
self,
docker_mw: DockerMiddleware | None = None,
):
self._docker = docker_mw
self._xmlrpc: XmlRpcMiddleware | None = None
self._active_container: str | None = None
@property
def _has_docker(self) -> bool:
return self._docker is not None
def _require_docker(self) -> DockerMiddleware:
if self._docker is None:
raise RuntimeError(
"Docker is not available. Install the 'docker' package "
"and ensure the Docker daemon is running."
)
return self._docker
def _require_xmlrpc(self) -> XmlRpcMiddleware:
if self._xmlrpc is None:
raise RuntimeError(
"Not connected to a flowgraph. Use connect() or "
"connect_to_container() first."
)
return self._xmlrpc
# ──────────────────────────────────────────
# Container Lifecycle
# ──────────────────────────────────────────
def launch_flowgraph(
self,
flowgraph_path: str,
name: str | None = None,
xmlrpc_port: int = 8080,
enable_vnc: bool = False,
device_paths: list[str] | None = None,
) -> ContainerModel:
"""Launch a flowgraph in a Docker container with Xvfb."""
docker = self._require_docker()
if name is None:
from pathlib import Path
name = f"gr-{Path(flowgraph_path).stem}"
return docker.launch(
flowgraph_path=flowgraph_path,
name=name,
xmlrpc_port=xmlrpc_port,
enable_vnc=enable_vnc,
device_paths=device_paths,
)
def list_containers(self) -> list[ContainerModel]:
"""List all gr-mcp managed containers."""
docker = self._require_docker()
return docker.list_containers()
def stop_flowgraph(self, name: str) -> bool:
"""Stop a running flowgraph container."""
docker = self._require_docker()
return docker.stop(name)
def remove_flowgraph(self, name: str, force: bool = False) -> bool:
"""Remove a flowgraph container."""
docker = self._require_docker()
return docker.remove(name, force=force)
# ──────────────────────────────────────────
# Connection Management
# ──────────────────────────────────────────
def connect(self, url: str) -> ConnectionInfoModel:
"""Connect to a GNU Radio XML-RPC endpoint."""
self._xmlrpc = XmlRpcMiddleware.connect(url)
self._active_container = None
# Parse port from URL
from urllib.parse import urlparse
parsed = urlparse(url)
port = parsed.port or 8080
return self._xmlrpc.get_connection_info(xmlrpc_port=port)
def connect_to_container(self, name: str) -> ConnectionInfoModel:
"""Connect to a flowgraph by container name (resolves port automatically)."""
docker = self._require_docker()
port = docker.get_xmlrpc_port(name)
url = f"http://localhost:{port}"
self._xmlrpc = XmlRpcMiddleware.connect(url)
self._active_container = name
return self._xmlrpc.get_connection_info(
container_name=name, xmlrpc_port=port
)
def disconnect(self) -> bool:
"""Disconnect from the current XML-RPC endpoint."""
if self._xmlrpc is not None:
self._xmlrpc.close()
self._xmlrpc = None
self._active_container = None
return True
def get_status(self) -> RuntimeStatusModel:
"""Get runtime status including connection and container info."""
connection = None
if self._xmlrpc is not None:
from urllib.parse import urlparse
parsed = urlparse(self._xmlrpc._url)
port = parsed.port or 8080
connection = self._xmlrpc.get_connection_info(
container_name=self._active_container, xmlrpc_port=port
)
containers = []
if self._has_docker:
try:
containers = self._docker.list_containers() # type: ignore[union-attr]
except Exception as e:
logger.warning("Failed to list containers: %s", e)
return RuntimeStatusModel(
connected=self._xmlrpc is not None,
connection=connection,
containers=containers,
)
# ──────────────────────────────────────────
# Variable Control
# ──────────────────────────────────────────
def list_variables(self) -> list[VariableModel]:
"""List all XML-RPC-exposed variables."""
xmlrpc = self._require_xmlrpc()
return xmlrpc.list_variables()
def get_variable(self, name: str) -> Any:
"""Get a variable value."""
xmlrpc = self._require_xmlrpc()
return xmlrpc.get_variable(name)
def set_variable(self, name: str, value: Any) -> bool:
"""Set a variable value."""
xmlrpc = self._require_xmlrpc()
return xmlrpc.set_variable(name, value)
# ──────────────────────────────────────────
# Flowgraph Execution Control
# ──────────────────────────────────────────
def start(self) -> bool:
"""Start the connected flowgraph."""
return self._require_xmlrpc().start()
def stop(self) -> bool:
"""Stop the connected flowgraph."""
return self._require_xmlrpc().stop()
def lock(self) -> bool:
"""Lock the flowgraph for thread-safe parameter updates."""
return self._require_xmlrpc().lock()
def unlock(self) -> bool:
"""Unlock the flowgraph after parameter updates."""
return self._require_xmlrpc().unlock()
# ──────────────────────────────────────────
# Visual Feedback
# ──────────────────────────────────────────
def capture_screenshot(self, name: str | None = None) -> ScreenshotModel:
"""Capture a screenshot of the flowgraph's QT GUI."""
docker = self._require_docker()
container_name = name or self._active_container
if container_name is None:
raise RuntimeError(
"No container specified. Provide a name or connect to a container first."
)
return docker.capture_screenshot(container_name)
def get_container_logs(self, name: str | None = None, tail: int = 100) -> str:
"""Get logs from a flowgraph container."""
docker = self._require_docker()
container_name = name or self._active_container
if container_name is None:
raise RuntimeError(
"No container specified. Provide a name or connect to a container first."
)
return docker.get_logs(container_name, tail=tail)

View File

@ -0,0 +1,244 @@
"""Unit tests for DockerMiddleware with mocked Docker client."""
from unittest.mock import MagicMock, patch
import pytest
from gnuradio_mcp.middlewares.docker import (
CONTAINER_FLOWGRAPH_DIR,
DEFAULT_XMLRPC_PORT,
DockerMiddleware,
)
from gnuradio_mcp.models import ContainerModel, ScreenshotModel
@pytest.fixture
def mock_docker_client():
return MagicMock()
@pytest.fixture
def docker_mw(mock_docker_client):
return DockerMiddleware(mock_docker_client)
class TestDockerMiddlewareCreate:
def test_create_returns_none_when_docker_unavailable(self):
with patch(
"gnuradio_mcp.middlewares.docker.docker",
create=True,
) as mock_mod:
mock_mod.from_env.side_effect = Exception("No Docker")
# We need to patch the import inside create()
with patch.dict("sys.modules", {"docker": mock_mod}):
result = DockerMiddleware.create()
assert result is None
def test_create_returns_middleware_when_docker_available(self):
mock_mod = MagicMock()
mock_client = MagicMock()
mock_mod.from_env.return_value = mock_client
with patch.dict("sys.modules", {"docker": mock_mod}):
result = DockerMiddleware.create()
assert result is not None
mock_client.ping.assert_called_once()
class TestLaunch:
def test_launch_creates_container(self, docker_mw, mock_docker_client, tmp_path):
fg_file = tmp_path / "test.grc"
fg_file.write_text("<flowgraph/>")
mock_container = MagicMock()
mock_container.id = "abc123def456"
mock_docker_client.containers.run.return_value = mock_container
result = docker_mw.launch(
flowgraph_path=str(fg_file),
name="test-fg",
xmlrpc_port=8080,
)
assert isinstance(result, ContainerModel)
assert result.name == "test-fg"
assert result.container_id == "abc123def456"
assert result.status == "running"
assert result.xmlrpc_port == 8080
mock_docker_client.containers.run.assert_called_once()
call_kwargs = mock_docker_client.containers.run.call_args
assert call_kwargs.kwargs["name"] == "test-fg"
assert call_kwargs.kwargs["detach"] is True
def test_launch_raises_on_missing_file(self, docker_mw):
with pytest.raises(FileNotFoundError):
docker_mw.launch(
flowgraph_path="/nonexistent/file.grc",
name="test",
)
def test_launch_with_vnc(self, docker_mw, mock_docker_client, tmp_path):
fg_file = tmp_path / "test.grc"
fg_file.write_text("<flowgraph/>")
mock_container = MagicMock()
mock_container.id = "abc123def456"
mock_docker_client.containers.run.return_value = mock_container
result = docker_mw.launch(
flowgraph_path=str(fg_file),
name="test-vnc",
enable_vnc=True,
)
assert result.vnc_port == 5900
# Verify VNC label is set
call_kwargs = mock_docker_client.containers.run.call_args
assert call_kwargs.kwargs["labels"]["gr-mcp.vnc-enabled"] == "1"
def test_launch_without_vnc_sets_label(self, docker_mw, mock_docker_client, tmp_path):
fg_file = tmp_path / "test.grc"
fg_file.write_text("<flowgraph/>")
mock_container = MagicMock()
mock_container.id = "abc123def456"
mock_docker_client.containers.run.return_value = mock_container
result = docker_mw.launch(
flowgraph_path=str(fg_file),
name="test-no-vnc",
enable_vnc=False,
)
assert result.vnc_port is None
# Verify VNC label is explicitly set to "0"
call_kwargs = mock_docker_client.containers.run.call_args
assert call_kwargs.kwargs["labels"]["gr-mcp.vnc-enabled"] == "0"
def test_launch_with_devices(self, docker_mw, mock_docker_client, tmp_path):
fg_file = tmp_path / "test.grc"
fg_file.write_text("<flowgraph/>")
mock_container = MagicMock()
mock_container.id = "abc123def456"
mock_docker_client.containers.run.return_value = mock_container
result = docker_mw.launch(
flowgraph_path=str(fg_file),
name="test-sdr",
device_paths=["/dev/bus/usb/001/002"],
)
assert result.device_paths == ["/dev/bus/usb/001/002"]
call_kwargs = mock_docker_client.containers.run.call_args
assert "/dev/bus/usb/001/002:/dev/bus/usb/001/002:rwm" in call_kwargs.kwargs["devices"]
class TestListContainers:
def test_list_containers(self, docker_mw, mock_docker_client):
mock_c = MagicMock()
mock_c.name = "gr-test"
mock_c.id = "abc123def456"
mock_c.status = "running"
mock_c.labels = {
"gr-mcp.flowgraph": "/path/to/test.grc",
"gr-mcp.xmlrpc-port": "8080",
"gr-mcp.vnc-enabled": "0",
}
mock_docker_client.containers.list.return_value = [mock_c]
result = docker_mw.list_containers()
assert len(result) == 1
assert result[0].name == "gr-test"
assert result[0].flowgraph_path == "/path/to/test.grc"
assert result[0].vnc_port is None # VNC not enabled
def test_list_containers_with_vnc(self, docker_mw, mock_docker_client):
mock_c = MagicMock()
mock_c.name = "gr-test-vnc"
mock_c.id = "abc123def456"
mock_c.status = "running"
mock_c.labels = {
"gr-mcp.flowgraph": "/path/to/test.grc",
"gr-mcp.xmlrpc-port": "8080",
"gr-mcp.vnc-enabled": "1",
}
mock_docker_client.containers.list.return_value = [mock_c]
result = docker_mw.list_containers()
assert len(result) == 1
assert result[0].vnc_port == 5900 # VNC enabled
def test_list_containers_empty(self, docker_mw, mock_docker_client):
mock_docker_client.containers.list.return_value = []
result = docker_mw.list_containers()
assert result == []
class TestStopRemove:
def test_stop(self, docker_mw, mock_docker_client):
mock_container = MagicMock()
mock_docker_client.containers.get.return_value = mock_container
assert docker_mw.stop("test") is True
mock_container.stop.assert_called_once_with(timeout=10)
def test_remove(self, docker_mw, mock_docker_client):
mock_container = MagicMock()
mock_docker_client.containers.get.return_value = mock_container
assert docker_mw.remove("test") is True
mock_container.remove.assert_called_once_with(force=False)
def test_remove_force(self, docker_mw, mock_docker_client):
mock_container = MagicMock()
mock_docker_client.containers.get.return_value = mock_container
assert docker_mw.remove("test", force=True) is True
mock_container.remove.assert_called_once_with(force=True)
class TestLogs:
def test_get_logs(self, docker_mw, mock_docker_client):
mock_container = MagicMock()
mock_container.logs.return_value = b"flowgraph started\n"
mock_docker_client.containers.get.return_value = mock_container
result = docker_mw.get_logs("test", tail=50)
assert "flowgraph started" in result
mock_container.logs.assert_called_once_with(tail=50)
class TestScreenshot:
def test_capture_screenshot(self, docker_mw, mock_docker_client):
mock_container = MagicMock()
# Simulate PNG bytes
mock_container.exec_run.return_value = (0, b"\x89PNG\r\n\x1a\n")
mock_docker_client.containers.get.return_value = mock_container
result = docker_mw.capture_screenshot("test")
assert isinstance(result, ScreenshotModel)
assert result.container_name == "test"
assert result.format == "png"
assert len(result.image_base64) > 0
def test_capture_screenshot_failure(self, docker_mw, mock_docker_client):
mock_container = MagicMock()
mock_container.exec_run.return_value = (1, b"error: no display")
mock_docker_client.containers.get.return_value = mock_container
with pytest.raises(RuntimeError, match="Screenshot failed"):
docker_mw.capture_screenshot("test")
class TestGetXmlRpcPort:
def test_get_port_from_label(self, docker_mw, mock_docker_client):
mock_container = MagicMock()
mock_container.labels = {"gr-mcp.xmlrpc-port": "9090"}
mock_docker_client.containers.get.return_value = mock_container
assert docker_mw.get_xmlrpc_port("test") == 9090
def test_get_default_port(self, docker_mw, mock_docker_client):
mock_container = MagicMock()
mock_container.labels = {}
mock_docker_client.containers.get.return_value = mock_container
assert docker_mw.get_xmlrpc_port("test") == DEFAULT_XMLRPC_PORT

View File

@ -0,0 +1,118 @@
"""Unit tests for XmlRpcMiddleware with mocked ServerProxy."""
from unittest.mock import MagicMock, patch
import pytest
from gnuradio_mcp.middlewares.xmlrpc import XmlRpcMiddleware
from gnuradio_mcp.models import ConnectionInfoModel, VariableModel
@pytest.fixture
def mock_proxy():
proxy = MagicMock()
proxy.system.listMethods.return_value = [
"system.listMethods",
"system.methodHelp",
"get_frequency",
"set_frequency",
"get_amplitude",
"set_amplitude",
"get_waveform",
"start",
"stop",
"lock",
"unlock",
]
return proxy
@pytest.fixture
def xmlrpc_mw(mock_proxy):
return XmlRpcMiddleware(mock_proxy, "http://localhost:8080")
class TestConnect:
def test_connect_success(self):
with patch("gnuradio_mcp.middlewares.xmlrpc.xmlrpc.client") as mock_xmlrpc:
mock_proxy = MagicMock()
mock_xmlrpc.ServerProxy.return_value = mock_proxy
mock_xmlrpc.Transport.return_value = MagicMock()
mw = XmlRpcMiddleware.connect("http://localhost:8080")
assert mw is not None
mock_proxy.system.listMethods.assert_called_once()
def test_connect_failure(self):
with patch("gnuradio_mcp.middlewares.xmlrpc.xmlrpc.client") as mock_xmlrpc:
mock_proxy = MagicMock()
mock_proxy.system.listMethods.side_effect = ConnectionRefusedError()
mock_xmlrpc.ServerProxy.return_value = mock_proxy
mock_xmlrpc.Transport.return_value = MagicMock()
with pytest.raises(ConnectionRefusedError):
XmlRpcMiddleware.connect("http://localhost:8080")
class TestConnectionInfo:
def test_get_connection_info(self, xmlrpc_mw, mock_proxy):
result = xmlrpc_mw.get_connection_info(container_name="test", xmlrpc_port=8080)
assert isinstance(result, ConnectionInfoModel)
assert result.url == "http://localhost:8080"
assert result.container_name == "test"
# Should exclude system.* methods
assert "system.listMethods" not in result.methods
assert "get_frequency" in result.methods
class TestListVariables:
def test_list_variables(self, xmlrpc_mw, mock_proxy):
mock_proxy.get_frequency.return_value = 1e6
mock_proxy.get_amplitude.return_value = 0.5
result = xmlrpc_mw.list_variables()
assert len(result) == 2
assert all(isinstance(v, VariableModel) for v in result)
names = {v.name for v in result}
assert "frequency" in names
assert "amplitude" in names
# waveform has get_ but no set_, should be excluded
assert "waveform" not in names
def test_list_variables_with_error(self, xmlrpc_mw, mock_proxy):
"""If get_* fails, variable should still appear with None value."""
mock_proxy.get_frequency.side_effect = Exception("timeout")
mock_proxy.get_amplitude.return_value = 0.5
result = xmlrpc_mw.list_variables()
freq_var = next(v for v in result if v.name == "frequency")
assert freq_var.value is None
class TestGetSetVariable:
def test_get_variable(self, xmlrpc_mw, mock_proxy):
mock_proxy.get_frequency.return_value = 1e6
assert xmlrpc_mw.get_variable("frequency") == 1e6
def test_set_variable(self, xmlrpc_mw, mock_proxy):
assert xmlrpc_mw.set_variable("frequency", 2e6) is True
mock_proxy.set_frequency.assert_called_once_with(2e6)
class TestFlowgraphControl:
def test_start(self, xmlrpc_mw, mock_proxy):
assert xmlrpc_mw.start() is True
mock_proxy.start.assert_called_once()
def test_stop(self, xmlrpc_mw, mock_proxy):
assert xmlrpc_mw.stop() is True
mock_proxy.stop.assert_called_once()
def test_lock(self, xmlrpc_mw, mock_proxy):
assert xmlrpc_mw.lock() is True
mock_proxy.lock.assert_called_once()
def test_unlock(self, xmlrpc_mw, mock_proxy):
assert xmlrpc_mw.unlock() is True
mock_proxy.unlock.assert_called_once()

20
uv.lock generated
View File

@ -257,6 +257,20 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/ba/5a/18ad964b0086c6e62e2e7500f7edc89e3faa45033c71c1893d34eed2b2de/dnspython-2.8.0-py3-none-any.whl", hash = "sha256:01d9bbc4a2d76bf0db7c1f729812ded6d912bd318d3b1cf81d30c0f845dbf3af", size = 331094, upload-time = "2025-09-07T18:57:58.071Z" },
]
[[package]]
name = "docker"
version = "7.1.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "pywin32", marker = "sys_platform == 'win32'" },
{ name = "requests" },
{ name = "urllib3" },
]
sdist = { url = "https://files.pythonhosted.org/packages/91/9b/4a2ea29aeba62471211598dac5d96825bb49348fa07e906ea930394a83ce/docker-7.1.0.tar.gz", hash = "sha256:ad8c70e6e3f8926cb8a92619b832b4ea5299e2831c14284663184e200546fa6c", size = 117834, upload-time = "2024-05-23T11:13:57.216Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/e3/26/57c6fb270950d476074c087527a558ccb6f4436657314bfb6cdf484114c4/docker-7.1.0-py3-none-any.whl", hash = "sha256:c96b93b7f0a746f9e77d325bcfb87422a3d8bd4f03136ae8a85b37f1898d5fc0", size = 147774, upload-time = "2024-05-23T11:13:55.01Z" },
]
[[package]]
name = "docstring-parser"
version = "0.17.0"
@ -353,9 +367,13 @@ dev = [
{ name = "pytest" },
{ name = "pytest-asyncio" },
]
runtime = [
{ name = "docker" },
]
[package.metadata]
requires-dist = [
{ name = "docker", marker = "extra == 'runtime'", specifier = ">=7.0" },
{ name = "fastmcp", specifier = ">=3.0.0b1" },
{ name = "mako", specifier = ">=1.3" },
{ name = "pre-commit", marker = "extra == 'dev'", specifier = ">=4.5" },
@ -364,7 +382,7 @@ requires-dist = [
{ name = "pytest-asyncio", marker = "extra == 'dev'", specifier = ">=1.3" },
{ name = "pyyaml", specifier = ">=6.0" },
]
provides-extras = ["dev"]
provides-extras = ["runtime", "dev"]
[[package]]
name = "h11"