tests: add RuntimeProvider coverage (82% → 94%)
Unit tests (35): - Mocked DockerMiddleware and XmlRpcMiddleware - Full coverage of orchestration logic, preconditions, state tracking Integration tests (6): - Real Docker containers with gnuradio-runtime image - End-to-end: launch → connect → control variables → stop - Skip gracefully when Docker/image unavailable Total: 112 tests, 94% coverage
This commit is contained in:
parent
2084c41228
commit
91a442cdf9
332
tests/integration/test_runtime_docker.py
Normal file
332
tests/integration/test_runtime_docker.py
Normal file
@ -0,0 +1,332 @@
|
||||
"""Integration tests for RuntimeProvider with real Docker.
|
||||
|
||||
These tests require:
|
||||
1. Docker daemon running
|
||||
2. gnuradio-runtime:latest image built (or tests will skip)
|
||||
|
||||
Run with: pytest tests/integration/test_runtime_docker.py -v
|
||||
"""
|
||||
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
# Check if Docker is available
|
||||
try:
|
||||
import docker
|
||||
|
||||
_docker_client = docker.from_env()
|
||||
_docker_client.ping()
|
||||
DOCKER_AVAILABLE = True
|
||||
except Exception:
|
||||
DOCKER_AVAILABLE = False
|
||||
|
||||
# Check if runtime image exists
|
||||
RUNTIME_IMAGE = "gnuradio-runtime:latest"
|
||||
RUNTIME_IMAGE_EXISTS = False
|
||||
if DOCKER_AVAILABLE:
|
||||
try:
|
||||
_docker_client.images.get(RUNTIME_IMAGE)
|
||||
RUNTIME_IMAGE_EXISTS = True
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
pytestmark = [
|
||||
pytest.mark.skipif(not DOCKER_AVAILABLE, reason="Docker not available"),
|
||||
pytest.mark.skipif(
|
||||
not RUNTIME_IMAGE_EXISTS,
|
||||
reason=f"Runtime image '{RUNTIME_IMAGE}' not built. "
|
||||
"Run: docker build -t gnuradio-runtime -f docker/Dockerfile.gnuradio-runtime docker/",
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def docker_client():
|
||||
"""Real Docker client."""
|
||||
return docker.from_env()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def cleanup_containers(docker_client):
|
||||
"""Cleanup any test containers after each test."""
|
||||
created_containers = []
|
||||
|
||||
yield created_containers
|
||||
|
||||
# Cleanup
|
||||
for name in created_containers:
|
||||
try:
|
||||
container = docker_client.containers.get(name)
|
||||
container.stop(timeout=5)
|
||||
container.remove(force=True)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_flowgraph(tmp_path) -> Path:
|
||||
"""Create a minimal Python flowgraph for testing.
|
||||
|
||||
This creates a simple Python script that mimics a GNU Radio flowgraph
|
||||
with XML-RPC server (for testing without requiring a real .grc file).
|
||||
"""
|
||||
fg_path = tmp_path / "test_flowgraph.py"
|
||||
fg_path.write_text(
|
||||
'''\
|
||||
#!/usr/bin/env python3
|
||||
"""Minimal test flowgraph with XML-RPC server."""
|
||||
|
||||
import os
|
||||
import time
|
||||
from xmlrpc.server import SimpleXMLRPCServer
|
||||
import threading
|
||||
|
||||
# Configurable via environment
|
||||
XMLRPC_PORT = int(os.environ.get("XMLRPC_PORT", 8080))
|
||||
|
||||
# Simulated flowgraph variables
|
||||
_variables = {
|
||||
"frequency": 1e6,
|
||||
"amplitude": 0.5,
|
||||
"running": False,
|
||||
}
|
||||
|
||||
|
||||
def get_frequency():
|
||||
return _variables["frequency"]
|
||||
|
||||
|
||||
def set_frequency(val):
|
||||
_variables["frequency"] = float(val)
|
||||
|
||||
|
||||
def get_amplitude():
|
||||
return _variables["amplitude"]
|
||||
|
||||
|
||||
def set_amplitude(val):
|
||||
_variables["amplitude"] = float(val)
|
||||
|
||||
|
||||
def start():
|
||||
_variables["running"] = True
|
||||
print("Flowgraph started")
|
||||
|
||||
|
||||
def stop():
|
||||
_variables["running"] = False
|
||||
print("Flowgraph stopped")
|
||||
|
||||
|
||||
def lock():
|
||||
print("Flowgraph locked")
|
||||
|
||||
|
||||
def unlock():
|
||||
print("Flowgraph unlocked")
|
||||
|
||||
|
||||
def main():
|
||||
server = SimpleXMLRPCServer(("0.0.0.0", XMLRPC_PORT), allow_none=True)
|
||||
server.register_introspection_functions() # Enable system.listMethods()
|
||||
server.register_function(get_frequency)
|
||||
server.register_function(set_frequency)
|
||||
server.register_function(get_amplitude)
|
||||
server.register_function(set_amplitude)
|
||||
server.register_function(start)
|
||||
server.register_function(stop)
|
||||
server.register_function(lock)
|
||||
server.register_function(unlock)
|
||||
|
||||
print(f"XML-RPC server listening on port {XMLRPC_PORT}")
|
||||
server.serve_forever()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
'''
|
||||
)
|
||||
return fg_path
|
||||
|
||||
|
||||
class TestDockerMiddlewareIntegration:
|
||||
"""Test DockerMiddleware with real Docker."""
|
||||
|
||||
def test_create_returns_middleware(self):
|
||||
from gnuradio_mcp.middlewares.docker import DockerMiddleware
|
||||
|
||||
mw = DockerMiddleware.create()
|
||||
assert mw is not None
|
||||
|
||||
def test_list_containers_empty_initially(self):
|
||||
from gnuradio_mcp.middlewares.docker import DockerMiddleware
|
||||
|
||||
mw = DockerMiddleware.create()
|
||||
# Filter to only our test containers
|
||||
containers = [c for c in mw.list_containers() if c.name.startswith("gr-test-")]
|
||||
# May or may not be empty depending on previous test runs
|
||||
assert isinstance(containers, list)
|
||||
|
||||
|
||||
class TestRuntimeProviderIntegration:
|
||||
"""Test RuntimeProvider with real Docker (requires runtime image)."""
|
||||
|
||||
def test_launch_and_stop_flowgraph(
|
||||
self, test_flowgraph, cleanup_containers
|
||||
):
|
||||
from gnuradio_mcp.middlewares.docker import DockerMiddleware
|
||||
from gnuradio_mcp.providers.runtime import RuntimeProvider
|
||||
|
||||
mw = DockerMiddleware.create()
|
||||
provider = RuntimeProvider(docker_mw=mw)
|
||||
|
||||
container_name = f"gr-test-{int(time.time())}"
|
||||
cleanup_containers.append(container_name)
|
||||
|
||||
# Launch
|
||||
result = provider.launch_flowgraph(
|
||||
flowgraph_path=str(test_flowgraph),
|
||||
name=container_name,
|
||||
xmlrpc_port=18080, # Use high port to avoid conflicts
|
||||
)
|
||||
|
||||
assert result.name == container_name
|
||||
assert result.status == "running"
|
||||
assert result.xmlrpc_port == 18080
|
||||
|
||||
# Wait for container to start
|
||||
time.sleep(2)
|
||||
|
||||
# Verify in list
|
||||
containers = provider.list_containers()
|
||||
names = [c.name for c in containers]
|
||||
assert container_name in names
|
||||
|
||||
# Stop
|
||||
assert provider.stop_flowgraph(container_name) is True
|
||||
|
||||
# Remove
|
||||
assert provider.remove_flowgraph(container_name) is True
|
||||
|
||||
# Remove from cleanup list since we already removed it
|
||||
cleanup_containers.remove(container_name)
|
||||
|
||||
def test_launch_connect_and_control(self, test_flowgraph, cleanup_containers):
|
||||
"""Full integration: launch container, connect via XML-RPC, control variables."""
|
||||
from gnuradio_mcp.middlewares.docker import DockerMiddleware
|
||||
from gnuradio_mcp.providers.runtime import RuntimeProvider
|
||||
|
||||
mw = DockerMiddleware.create()
|
||||
provider = RuntimeProvider(docker_mw=mw)
|
||||
|
||||
container_name = f"gr-test-{int(time.time())}"
|
||||
cleanup_containers.append(container_name)
|
||||
|
||||
# Launch with specific port
|
||||
xmlrpc_port = 18081
|
||||
provider.launch_flowgraph(
|
||||
flowgraph_path=str(test_flowgraph),
|
||||
name=container_name,
|
||||
xmlrpc_port=xmlrpc_port,
|
||||
)
|
||||
|
||||
# Wait for XML-RPC server to be ready
|
||||
time.sleep(3)
|
||||
|
||||
try:
|
||||
# Connect
|
||||
connection = provider.connect(f"http://localhost:{xmlrpc_port}")
|
||||
assert connection.url == f"http://localhost:{xmlrpc_port}"
|
||||
assert "get_frequency" in connection.methods
|
||||
|
||||
# List variables
|
||||
variables = provider.list_variables()
|
||||
var_names = [v.name for v in variables]
|
||||
assert "frequency" in var_names
|
||||
assert "amplitude" in var_names
|
||||
|
||||
# Get/set variable
|
||||
freq = provider.get_variable("frequency")
|
||||
assert freq == 1e6
|
||||
|
||||
provider.set_variable("frequency", 2e6)
|
||||
new_freq = provider.get_variable("frequency")
|
||||
assert new_freq == 2e6
|
||||
|
||||
# Flowgraph control
|
||||
assert provider.start() is True
|
||||
assert provider.lock() is True
|
||||
assert provider.unlock() is True
|
||||
assert provider.stop() is True
|
||||
|
||||
# Disconnect
|
||||
provider.disconnect()
|
||||
|
||||
finally:
|
||||
# Cleanup
|
||||
provider.stop_flowgraph(container_name)
|
||||
provider.remove_flowgraph(container_name, force=True)
|
||||
cleanup_containers.remove(container_name)
|
||||
|
||||
def test_get_container_logs(self, test_flowgraph, cleanup_containers):
|
||||
"""Test retrieving container logs."""
|
||||
from gnuradio_mcp.middlewares.docker import DockerMiddleware
|
||||
from gnuradio_mcp.providers.runtime import RuntimeProvider
|
||||
|
||||
mw = DockerMiddleware.create()
|
||||
provider = RuntimeProvider(docker_mw=mw)
|
||||
|
||||
container_name = f"gr-test-logs-{int(time.time())}"
|
||||
cleanup_containers.append(container_name)
|
||||
|
||||
provider.launch_flowgraph(
|
||||
flowgraph_path=str(test_flowgraph),
|
||||
name=container_name,
|
||||
xmlrpc_port=18082,
|
||||
)
|
||||
|
||||
# Wait for startup
|
||||
time.sleep(2)
|
||||
|
||||
try:
|
||||
logs = provider.get_container_logs(container_name, tail=50)
|
||||
# Should contain startup message from our test flowgraph
|
||||
assert "XML-RPC server listening" in logs or "Xvfb" in logs
|
||||
|
||||
finally:
|
||||
provider.stop_flowgraph(container_name)
|
||||
provider.remove_flowgraph(container_name, force=True)
|
||||
cleanup_containers.remove(container_name)
|
||||
|
||||
def test_status_shows_running_container(self, test_flowgraph, cleanup_containers):
|
||||
"""Test get_status includes running containers."""
|
||||
from gnuradio_mcp.middlewares.docker import DockerMiddleware
|
||||
from gnuradio_mcp.providers.runtime import RuntimeProvider
|
||||
|
||||
mw = DockerMiddleware.create()
|
||||
provider = RuntimeProvider(docker_mw=mw)
|
||||
|
||||
container_name = f"gr-test-status-{int(time.time())}"
|
||||
cleanup_containers.append(container_name)
|
||||
|
||||
provider.launch_flowgraph(
|
||||
flowgraph_path=str(test_flowgraph),
|
||||
name=container_name,
|
||||
xmlrpc_port=18083,
|
||||
)
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
try:
|
||||
status = provider.get_status()
|
||||
assert status.connected is False # Not connected yet
|
||||
container_names = [c.name for c in status.containers]
|
||||
assert container_name in container_names
|
||||
|
||||
finally:
|
||||
provider.stop_flowgraph(container_name)
|
||||
provider.remove_flowgraph(container_name, force=True)
|
||||
cleanup_containers.remove(container_name)
|
||||
326
tests/unit/test_runtime_provider.py
Normal file
326
tests/unit/test_runtime_provider.py
Normal file
@ -0,0 +1,326 @@
|
||||
"""Unit tests for RuntimeProvider with mocked middlewares."""
|
||||
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from gnuradio_mcp.models import (
|
||||
ConnectionInfoModel,
|
||||
ContainerModel,
|
||||
RuntimeStatusModel,
|
||||
ScreenshotModel,
|
||||
VariableModel,
|
||||
)
|
||||
from gnuradio_mcp.providers.runtime import RuntimeProvider
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_docker_mw():
|
||||
"""Mock DockerMiddleware."""
|
||||
mw = MagicMock()
|
||||
mw.launch.return_value = ContainerModel(
|
||||
name="gr-test",
|
||||
container_id="abc123",
|
||||
status="running",
|
||||
flowgraph_path="/path/to/test.grc",
|
||||
xmlrpc_port=8080,
|
||||
)
|
||||
mw.list_containers.return_value = [
|
||||
ContainerModel(
|
||||
name="gr-test",
|
||||
container_id="abc123",
|
||||
status="running",
|
||||
flowgraph_path="/path/to/test.grc",
|
||||
xmlrpc_port=8080,
|
||||
)
|
||||
]
|
||||
mw.stop.return_value = True
|
||||
mw.remove.return_value = True
|
||||
mw.get_xmlrpc_port.return_value = 8080
|
||||
mw.capture_screenshot.return_value = ScreenshotModel(
|
||||
container_name="gr-test",
|
||||
image_base64="iVBORw0KGgo=",
|
||||
format="png",
|
||||
)
|
||||
mw.get_logs.return_value = "flowgraph started\n"
|
||||
return mw
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_xmlrpc_mw():
|
||||
"""Mock XmlRpcMiddleware."""
|
||||
mw = MagicMock()
|
||||
mw._url = "http://localhost:8080"
|
||||
mw.get_connection_info.return_value = ConnectionInfoModel(
|
||||
url="http://localhost:8080",
|
||||
xmlrpc_port=8080,
|
||||
methods=["get_freq", "set_freq"],
|
||||
)
|
||||
mw.list_variables.return_value = [
|
||||
VariableModel(name="freq", value=1e6),
|
||||
VariableModel(name="amp", value=0.5),
|
||||
]
|
||||
mw.get_variable.return_value = 1e6
|
||||
mw.set_variable.return_value = True
|
||||
mw.start.return_value = True
|
||||
mw.stop.return_value = True
|
||||
mw.lock.return_value = True
|
||||
mw.unlock.return_value = True
|
||||
return mw
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def provider_with_docker(mock_docker_mw):
|
||||
"""RuntimeProvider with Docker available."""
|
||||
return RuntimeProvider(docker_mw=mock_docker_mw)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def provider_no_docker():
|
||||
"""RuntimeProvider without Docker."""
|
||||
return RuntimeProvider(docker_mw=None)
|
||||
|
||||
|
||||
class TestInitialization:
|
||||
def test_has_docker_true(self, provider_with_docker):
|
||||
assert provider_with_docker._has_docker is True
|
||||
|
||||
def test_has_docker_false(self, provider_no_docker):
|
||||
assert provider_no_docker._has_docker is False
|
||||
|
||||
def test_initial_state(self, provider_with_docker):
|
||||
assert provider_with_docker._xmlrpc is None
|
||||
assert provider_with_docker._active_container is None
|
||||
|
||||
|
||||
class TestPreconditions:
|
||||
def test_require_docker_raises_without_docker(self, provider_no_docker):
|
||||
with pytest.raises(RuntimeError, match="Docker is not available"):
|
||||
provider_no_docker._require_docker()
|
||||
|
||||
def test_require_docker_returns_middleware(self, provider_with_docker, mock_docker_mw):
|
||||
result = provider_with_docker._require_docker()
|
||||
assert result is mock_docker_mw
|
||||
|
||||
def test_require_xmlrpc_raises_when_not_connected(self, provider_with_docker):
|
||||
with pytest.raises(RuntimeError, match="Not connected"):
|
||||
provider_with_docker._require_xmlrpc()
|
||||
|
||||
|
||||
class TestContainerLifecycle:
|
||||
def test_launch_flowgraph(self, provider_with_docker, mock_docker_mw, tmp_path):
|
||||
fg = tmp_path / "test.grc"
|
||||
fg.write_text("<flowgraph/>")
|
||||
|
||||
result = provider_with_docker.launch_flowgraph(
|
||||
flowgraph_path=str(fg),
|
||||
name="my-fg",
|
||||
xmlrpc_port=9090,
|
||||
enable_vnc=True,
|
||||
)
|
||||
|
||||
assert isinstance(result, ContainerModel)
|
||||
mock_docker_mw.launch.assert_called_once_with(
|
||||
flowgraph_path=str(fg),
|
||||
name="my-fg",
|
||||
xmlrpc_port=9090,
|
||||
enable_vnc=True,
|
||||
device_paths=None,
|
||||
)
|
||||
|
||||
def test_launch_flowgraph_auto_name(self, provider_with_docker, mock_docker_mw, tmp_path):
|
||||
fg = tmp_path / "siggen_xmlrpc.grc"
|
||||
fg.write_text("<flowgraph/>")
|
||||
|
||||
provider_with_docker.launch_flowgraph(flowgraph_path=str(fg))
|
||||
|
||||
call_kwargs = mock_docker_mw.launch.call_args
|
||||
assert call_kwargs.kwargs["name"] == "gr-siggen_xmlrpc"
|
||||
|
||||
def test_launch_flowgraph_requires_docker(self, provider_no_docker, tmp_path):
|
||||
fg = tmp_path / "test.grc"
|
||||
fg.write_text("<flowgraph/>")
|
||||
|
||||
with pytest.raises(RuntimeError, match="Docker is not available"):
|
||||
provider_no_docker.launch_flowgraph(str(fg))
|
||||
|
||||
def test_list_containers(self, provider_with_docker, mock_docker_mw):
|
||||
result = provider_with_docker.list_containers()
|
||||
assert len(result) == 1
|
||||
assert result[0].name == "gr-test"
|
||||
mock_docker_mw.list_containers.assert_called_once()
|
||||
|
||||
def test_stop_flowgraph(self, provider_with_docker, mock_docker_mw):
|
||||
result = provider_with_docker.stop_flowgraph("gr-test")
|
||||
assert result is True
|
||||
mock_docker_mw.stop.assert_called_once_with("gr-test")
|
||||
|
||||
def test_remove_flowgraph(self, provider_with_docker, mock_docker_mw):
|
||||
result = provider_with_docker.remove_flowgraph("gr-test", force=True)
|
||||
assert result is True
|
||||
mock_docker_mw.remove.assert_called_once_with("gr-test", force=True)
|
||||
|
||||
|
||||
class TestConnectionManagement:
|
||||
def test_connect(self, provider_with_docker, mock_xmlrpc_mw):
|
||||
with patch(
|
||||
"gnuradio_mcp.providers.runtime.XmlRpcMiddleware.connect",
|
||||
return_value=mock_xmlrpc_mw,
|
||||
):
|
||||
result = provider_with_docker.connect("http://localhost:8080")
|
||||
|
||||
assert isinstance(result, ConnectionInfoModel)
|
||||
assert provider_with_docker._xmlrpc is mock_xmlrpc_mw
|
||||
assert provider_with_docker._active_container is None
|
||||
|
||||
def test_connect_parses_port(self, provider_with_docker, mock_xmlrpc_mw):
|
||||
with patch(
|
||||
"gnuradio_mcp.providers.runtime.XmlRpcMiddleware.connect",
|
||||
return_value=mock_xmlrpc_mw,
|
||||
):
|
||||
provider_with_docker.connect("http://localhost:9090")
|
||||
mock_xmlrpc_mw.get_connection_info.assert_called_with(xmlrpc_port=9090)
|
||||
|
||||
def test_connect_to_container(self, provider_with_docker, mock_docker_mw, mock_xmlrpc_mw):
|
||||
with patch(
|
||||
"gnuradio_mcp.providers.runtime.XmlRpcMiddleware.connect",
|
||||
return_value=mock_xmlrpc_mw,
|
||||
):
|
||||
result = provider_with_docker.connect_to_container("gr-test")
|
||||
|
||||
assert isinstance(result, ConnectionInfoModel)
|
||||
assert provider_with_docker._active_container == "gr-test"
|
||||
mock_docker_mw.get_xmlrpc_port.assert_called_once_with("gr-test")
|
||||
|
||||
def test_disconnect(self, provider_with_docker, mock_xmlrpc_mw):
|
||||
provider_with_docker._xmlrpc = mock_xmlrpc_mw
|
||||
provider_with_docker._active_container = "gr-test"
|
||||
|
||||
result = provider_with_docker.disconnect()
|
||||
|
||||
assert result is True
|
||||
assert provider_with_docker._xmlrpc is None
|
||||
assert provider_with_docker._active_container is None
|
||||
mock_xmlrpc_mw.close.assert_called_once()
|
||||
|
||||
def test_disconnect_when_not_connected(self, provider_with_docker):
|
||||
result = provider_with_docker.disconnect()
|
||||
assert result is True # Should be idempotent
|
||||
|
||||
def test_get_status_not_connected(self, provider_with_docker, mock_docker_mw):
|
||||
result = provider_with_docker.get_status()
|
||||
|
||||
assert isinstance(result, RuntimeStatusModel)
|
||||
assert result.connected is False
|
||||
assert result.connection is None
|
||||
assert len(result.containers) == 1
|
||||
|
||||
def test_get_status_connected(self, provider_with_docker, mock_docker_mw, mock_xmlrpc_mw):
|
||||
provider_with_docker._xmlrpc = mock_xmlrpc_mw
|
||||
provider_with_docker._active_container = "gr-test"
|
||||
|
||||
result = provider_with_docker.get_status()
|
||||
|
||||
assert result.connected is True
|
||||
assert result.connection is not None
|
||||
mock_xmlrpc_mw.get_connection_info.assert_called()
|
||||
|
||||
def test_get_status_handles_docker_error(self, provider_with_docker, mock_docker_mw):
|
||||
mock_docker_mw.list_containers.side_effect = Exception("Docker error")
|
||||
|
||||
result = provider_with_docker.get_status()
|
||||
|
||||
assert result.containers == [] # Gracefully handles error
|
||||
|
||||
|
||||
class TestVariableControl:
|
||||
def test_list_variables(self, provider_with_docker, mock_xmlrpc_mw):
|
||||
provider_with_docker._xmlrpc = mock_xmlrpc_mw
|
||||
|
||||
result = provider_with_docker.list_variables()
|
||||
|
||||
assert len(result) == 2
|
||||
assert all(isinstance(v, VariableModel) for v in result)
|
||||
mock_xmlrpc_mw.list_variables.assert_called_once()
|
||||
|
||||
def test_list_variables_requires_connection(self, provider_with_docker):
|
||||
with pytest.raises(RuntimeError, match="Not connected"):
|
||||
provider_with_docker.list_variables()
|
||||
|
||||
def test_get_variable(self, provider_with_docker, mock_xmlrpc_mw):
|
||||
provider_with_docker._xmlrpc = mock_xmlrpc_mw
|
||||
|
||||
result = provider_with_docker.get_variable("freq")
|
||||
|
||||
assert result == 1e6
|
||||
mock_xmlrpc_mw.get_variable.assert_called_once_with("freq")
|
||||
|
||||
def test_set_variable(self, provider_with_docker, mock_xmlrpc_mw):
|
||||
provider_with_docker._xmlrpc = mock_xmlrpc_mw
|
||||
|
||||
result = provider_with_docker.set_variable("freq", 2e6)
|
||||
|
||||
assert result is True
|
||||
mock_xmlrpc_mw.set_variable.assert_called_once_with("freq", 2e6)
|
||||
|
||||
|
||||
class TestFlowgraphControl:
|
||||
def test_start(self, provider_with_docker, mock_xmlrpc_mw):
|
||||
provider_with_docker._xmlrpc = mock_xmlrpc_mw
|
||||
assert provider_with_docker.start() is True
|
||||
mock_xmlrpc_mw.start.assert_called_once()
|
||||
|
||||
def test_stop(self, provider_with_docker, mock_xmlrpc_mw):
|
||||
provider_with_docker._xmlrpc = mock_xmlrpc_mw
|
||||
assert provider_with_docker.stop() is True
|
||||
mock_xmlrpc_mw.stop.assert_called_once()
|
||||
|
||||
def test_lock(self, provider_with_docker, mock_xmlrpc_mw):
|
||||
provider_with_docker._xmlrpc = mock_xmlrpc_mw
|
||||
assert provider_with_docker.lock() is True
|
||||
mock_xmlrpc_mw.lock.assert_called_once()
|
||||
|
||||
def test_unlock(self, provider_with_docker, mock_xmlrpc_mw):
|
||||
provider_with_docker._xmlrpc = mock_xmlrpc_mw
|
||||
assert provider_with_docker.unlock() is True
|
||||
mock_xmlrpc_mw.unlock.assert_called_once()
|
||||
|
||||
def test_flowgraph_control_requires_connection(self, provider_with_docker):
|
||||
with pytest.raises(RuntimeError, match="Not connected"):
|
||||
provider_with_docker.start()
|
||||
|
||||
|
||||
class TestVisualFeedback:
|
||||
def test_capture_screenshot_with_name(self, provider_with_docker, mock_docker_mw):
|
||||
result = provider_with_docker.capture_screenshot("gr-test")
|
||||
|
||||
assert isinstance(result, ScreenshotModel)
|
||||
mock_docker_mw.capture_screenshot.assert_called_once_with("gr-test")
|
||||
|
||||
def test_capture_screenshot_uses_active_container(self, provider_with_docker, mock_docker_mw):
|
||||
provider_with_docker._active_container = "gr-active"
|
||||
|
||||
provider_with_docker.capture_screenshot()
|
||||
|
||||
mock_docker_mw.capture_screenshot.assert_called_once_with("gr-active")
|
||||
|
||||
def test_capture_screenshot_requires_container(self, provider_with_docker):
|
||||
with pytest.raises(RuntimeError, match="No container specified"):
|
||||
provider_with_docker.capture_screenshot()
|
||||
|
||||
def test_get_container_logs_with_name(self, provider_with_docker, mock_docker_mw):
|
||||
result = provider_with_docker.get_container_logs("gr-test", tail=50)
|
||||
|
||||
assert "flowgraph started" in result
|
||||
mock_docker_mw.get_logs.assert_called_once_with("gr-test", tail=50)
|
||||
|
||||
def test_get_container_logs_uses_active_container(self, provider_with_docker, mock_docker_mw):
|
||||
provider_with_docker._active_container = "gr-active"
|
||||
|
||||
provider_with_docker.get_container_logs()
|
||||
|
||||
mock_docker_mw.get_logs.assert_called_once_with("gr-active", tail=100)
|
||||
|
||||
def test_get_container_logs_requires_container(self, provider_with_docker):
|
||||
with pytest.raises(RuntimeError, match="No container specified"):
|
||||
provider_with_docker.get_container_logs()
|
||||
Loading…
x
Reference in New Issue
Block a user