runtime: add dynamic port allocation for Docker containers
Prevent silent Docker bind failures by checking port availability before container creation. Supports auto-allocation (port=0) and patches compiled flowgraphs when the embedded XML-RPC port differs from the requested port.
This commit is contained in:
parent
0afb2f5b6e
commit
75d19eb6dd
@ -5,6 +5,13 @@ import logging
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
|
from gnuradio_mcp.middlewares.ports import (
|
||||||
|
PortConflictError,
|
||||||
|
detect_xmlrpc_port,
|
||||||
|
find_free_port,
|
||||||
|
is_port_available,
|
||||||
|
patch_xmlrpc_port,
|
||||||
|
)
|
||||||
from gnuradio_mcp.models import ContainerModel, ScreenshotModel
|
from gnuradio_mcp.models import ContainerModel, ScreenshotModel
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@ -72,6 +79,24 @@ class DockerMiddleware:
|
|||||||
if not fg_path.exists():
|
if not fg_path.exists():
|
||||||
raise FileNotFoundError(f"Flowgraph not found: {fg_path}")
|
raise FileNotFoundError(f"Flowgraph not found: {fg_path}")
|
||||||
|
|
||||||
|
# --- Port resolution ---
|
||||||
|
xmlrpc_port = self._resolve_port(xmlrpc_port, "XML-RPC")
|
||||||
|
if enable_controlport:
|
||||||
|
controlport_port = self._resolve_port(controlport_port, "ControlPort")
|
||||||
|
vnc_port_resolved: int | None = None
|
||||||
|
if enable_vnc:
|
||||||
|
vnc_port_resolved = self._resolve_port(DEFAULT_VNC_PORT, "VNC")
|
||||||
|
|
||||||
|
# --- Flowgraph port patching ---
|
||||||
|
embedded_port = detect_xmlrpc_port(fg_path)
|
||||||
|
if embedded_port is not None and embedded_port != xmlrpc_port:
|
||||||
|
fg_path = patch_xmlrpc_port(fg_path, xmlrpc_port)
|
||||||
|
logger.info(
|
||||||
|
"Patched flowgraph XML-RPC port: %d -> %d",
|
||||||
|
embedded_port,
|
||||||
|
xmlrpc_port,
|
||||||
|
)
|
||||||
|
|
||||||
# Select image based on coverage mode
|
# Select image based on coverage mode
|
||||||
image = COVERAGE_IMAGE if enable_coverage else RUNTIME_IMAGE
|
image = COVERAGE_IMAGE if enable_coverage else RUNTIME_IMAGE
|
||||||
|
|
||||||
@ -86,10 +111,8 @@ class DockerMiddleware:
|
|||||||
env["ENABLE_PERF_COUNTERS"] = "True" if enable_perf_counters else "False"
|
env["ENABLE_PERF_COUNTERS"] = "True" if enable_perf_counters else "False"
|
||||||
|
|
||||||
ports: dict[str, int] = {f"{xmlrpc_port}/tcp": xmlrpc_port}
|
ports: dict[str, int] = {f"{xmlrpc_port}/tcp": xmlrpc_port}
|
||||||
vnc_port: int | None = None
|
if enable_vnc and vnc_port_resolved is not None:
|
||||||
if enable_vnc:
|
ports[f"{vnc_port_resolved}/tcp"] = vnc_port_resolved
|
||||||
vnc_port = DEFAULT_VNC_PORT
|
|
||||||
ports[f"{vnc_port}/tcp"] = vnc_port
|
|
||||||
if enable_controlport:
|
if enable_controlport:
|
||||||
ports[f"{controlport_port}/tcp"] = controlport_port
|
ports[f"{controlport_port}/tcp"] = controlport_port
|
||||||
|
|
||||||
@ -139,13 +162,27 @@ class DockerMiddleware:
|
|||||||
status="running",
|
status="running",
|
||||||
flowgraph_path=str(fg_path),
|
flowgraph_path=str(fg_path),
|
||||||
xmlrpc_port=xmlrpc_port,
|
xmlrpc_port=xmlrpc_port,
|
||||||
vnc_port=vnc_port,
|
vnc_port=vnc_port_resolved,
|
||||||
controlport_port=controlport_port if enable_controlport else None,
|
controlport_port=controlport_port if enable_controlport else None,
|
||||||
device_paths=device_paths or [],
|
device_paths=device_paths or [],
|
||||||
coverage_enabled=enable_coverage,
|
coverage_enabled=enable_coverage,
|
||||||
controlport_enabled=enable_controlport,
|
controlport_enabled=enable_controlport,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _resolve_port(port: int, label: str) -> int:
|
||||||
|
"""Resolve a port value: 0 means auto-allocate, otherwise check availability."""
|
||||||
|
if port == 0:
|
||||||
|
allocated = find_free_port()
|
||||||
|
logger.info("Auto-allocated %s port: %d", label, allocated)
|
||||||
|
return allocated
|
||||||
|
if not is_port_available(port):
|
||||||
|
raise PortConflictError(
|
||||||
|
f"{label} port {port} is already in use. "
|
||||||
|
f"Use port=0 for auto-allocation."
|
||||||
|
)
|
||||||
|
return port
|
||||||
|
|
||||||
def list_containers(self) -> list[ContainerModel]:
|
def list_containers(self) -> list[ContainerModel]:
|
||||||
"""List all gr-mcp managed containers."""
|
"""List all gr-mcp managed containers."""
|
||||||
containers = self._client.containers.list(
|
containers = self._client.containers.list(
|
||||||
|
|||||||
96
src/gnuradio_mcp/middlewares/ports.py
Normal file
96
src/gnuradio_mcp/middlewares/ports.py
Normal file
@ -0,0 +1,96 @@
|
|||||||
|
"""Port utilities for dynamic allocation and flowgraph patching.
|
||||||
|
|
||||||
|
Provides pre-flight port checking so Docker bind failures are caught
|
||||||
|
early with actionable error messages, and flowgraph patching so the
|
||||||
|
compiled .py can use a different XML-RPC port than what GRC baked in.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
import socket
|
||||||
|
import tempfile
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Regex for SimpleXMLRPCServer(('addr', PORT)) in compiled flowgraphs.
|
||||||
|
# GRC emits: xmlrpc_server_0 = SimpleXMLRPCServer(('localhost', 8080), ...)
|
||||||
|
_XMLRPC_PORT_RE = re.compile(r"(SimpleXMLRPCServer\(\s*\([^,]+,\s*)(\d+)(\s*\))")
|
||||||
|
|
||||||
|
|
||||||
|
class PortConflictError(RuntimeError):
|
||||||
|
"""Raised when a requested port is already in use."""
|
||||||
|
|
||||||
|
|
||||||
|
def is_port_available(port: int) -> bool:
|
||||||
|
"""Check if a TCP port is available on localhost.
|
||||||
|
|
||||||
|
Attempts to bind a socket to the port. Returns True if the bind
|
||||||
|
succeeds (port is free), False otherwise.
|
||||||
|
"""
|
||||||
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
|
||||||
|
try:
|
||||||
|
sock.bind(("127.0.0.1", port))
|
||||||
|
return True
|
||||||
|
except OSError:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def find_free_port() -> int:
|
||||||
|
"""Find a free TCP port using the OS ephemeral range.
|
||||||
|
|
||||||
|
Binds to port 0, which lets the kernel pick an available port,
|
||||||
|
then closes the socket and returns the chosen port. A second
|
||||||
|
availability check guards against the (rare) race where another
|
||||||
|
process grabs it between close and Docker bind.
|
||||||
|
"""
|
||||||
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
|
||||||
|
sock.bind(("", 0))
|
||||||
|
port = sock.getsockname()[1]
|
||||||
|
return port
|
||||||
|
|
||||||
|
|
||||||
|
def detect_xmlrpc_port(flowgraph_py: Path) -> int | None:
|
||||||
|
"""Extract the SimpleXMLRPCServer port from a compiled flowgraph.
|
||||||
|
|
||||||
|
Returns the port number, or None if no XML-RPC server is found.
|
||||||
|
"""
|
||||||
|
text = flowgraph_py.read_text()
|
||||||
|
match = _XMLRPC_PORT_RE.search(text)
|
||||||
|
if match:
|
||||||
|
return int(match.group(2))
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def patch_xmlrpc_port(flowgraph_py: Path, new_port: int) -> Path:
|
||||||
|
"""Create a patched copy of the flowgraph with a different XML-RPC port.
|
||||||
|
|
||||||
|
The original file is never modified. The patched copy is placed in
|
||||||
|
the same directory so Docker volume mounts pick it up automatically.
|
||||||
|
|
||||||
|
Returns the path to the patched temporary file.
|
||||||
|
"""
|
||||||
|
text = flowgraph_py.read_text()
|
||||||
|
patched, count = _XMLRPC_PORT_RE.subn(
|
||||||
|
rf"\g<1>{new_port}\3",
|
||||||
|
text,
|
||||||
|
)
|
||||||
|
if count == 0:
|
||||||
|
raise ValueError(f"No SimpleXMLRPCServer port found in {flowgraph_py} to patch")
|
||||||
|
|
||||||
|
# Write to a temp file in the same directory (same Docker mount)
|
||||||
|
fd, tmp_path = tempfile.mkstemp(
|
||||||
|
suffix=".py",
|
||||||
|
prefix=f"{flowgraph_py.stem}_patched_",
|
||||||
|
dir=flowgraph_py.parent,
|
||||||
|
)
|
||||||
|
tmp = Path(tmp_path)
|
||||||
|
tmp.write_text(patched)
|
||||||
|
# mkstemp opens the fd; we wrote via Path so close it
|
||||||
|
os.close(fd)
|
||||||
|
|
||||||
|
logger.debug("Patched flowgraph written to %s", tmp)
|
||||||
|
return tmp
|
||||||
@ -1,5 +1,6 @@
|
|||||||
"""Unit tests for DockerMiddleware with mocked Docker client."""
|
"""Unit tests for DockerMiddleware with mocked Docker client."""
|
||||||
|
|
||||||
|
import socket
|
||||||
from unittest.mock import MagicMock, patch
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
@ -8,6 +9,7 @@ from gnuradio_mcp.middlewares.docker import (
|
|||||||
DEFAULT_XMLRPC_PORT,
|
DEFAULT_XMLRPC_PORT,
|
||||||
DockerMiddleware,
|
DockerMiddleware,
|
||||||
)
|
)
|
||||||
|
from gnuradio_mcp.middlewares.ports import PortConflictError
|
||||||
from gnuradio_mcp.models import ContainerModel, ScreenshotModel
|
from gnuradio_mcp.models import ContainerModel, ScreenshotModel
|
||||||
|
|
||||||
|
|
||||||
@ -44,6 +46,14 @@ class TestDockerMiddlewareCreate:
|
|||||||
|
|
||||||
|
|
||||||
class TestLaunch:
|
class TestLaunch:
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def _bypass_port_check(self):
|
||||||
|
"""Existing launch tests don't care about port availability."""
|
||||||
|
with patch(
|
||||||
|
"gnuradio_mcp.middlewares.docker.is_port_available", return_value=True
|
||||||
|
):
|
||||||
|
yield
|
||||||
|
|
||||||
def test_launch_creates_container(self, docker_mw, mock_docker_client, tmp_path):
|
def test_launch_creates_container(self, docker_mw, mock_docker_client, tmp_path):
|
||||||
fg_file = tmp_path / "test.grc"
|
fg_file = tmp_path / "test.grc"
|
||||||
fg_file.write_text("<flowgraph/>")
|
fg_file.write_text("<flowgraph/>")
|
||||||
@ -250,6 +260,13 @@ class TestGetXmlRpcPort:
|
|||||||
|
|
||||||
|
|
||||||
class TestCoverage:
|
class TestCoverage:
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def _bypass_port_check(self):
|
||||||
|
with patch(
|
||||||
|
"gnuradio_mcp.middlewares.docker.is_port_available", return_value=True
|
||||||
|
):
|
||||||
|
yield
|
||||||
|
|
||||||
def test_launch_with_coverage_uses_coverage_image(
|
def test_launch_with_coverage_uses_coverage_image(
|
||||||
self, docker_mw, mock_docker_client, tmp_path
|
self, docker_mw, mock_docker_client, tmp_path
|
||||||
):
|
):
|
||||||
@ -386,3 +403,101 @@ class TestCoverage:
|
|||||||
# Should still return True (container will be killed)
|
# Should still return True (container will be killed)
|
||||||
assert result is True
|
assert result is True
|
||||||
assert "didn't stop gracefully" in caplog.text
|
assert "didn't stop gracefully" in caplog.text
|
||||||
|
|
||||||
|
|
||||||
|
# Sample flowgraph with embedded XML-RPC port
|
||||||
|
_SAMPLE_FG = """\
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
from xmlrpc.server import SimpleXMLRPCServer
|
||||||
|
class top_block:
|
||||||
|
def __init__(self):
|
||||||
|
self.xmlrpc_server_0 = SimpleXMLRPCServer(('localhost', 8080), allow_none=True)
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class TestPortAllocation:
|
||||||
|
def test_launch_auto_allocates_port(self, docker_mw, mock_docker_client, tmp_path):
|
||||||
|
"""xmlrpc_port=0 should auto-allocate a free port."""
|
||||||
|
fg_file = tmp_path / "test.py"
|
||||||
|
fg_file.write_text(_SAMPLE_FG)
|
||||||
|
|
||||||
|
mock_container = MagicMock()
|
||||||
|
mock_container.id = "abc123def456"
|
||||||
|
mock_docker_client.containers.run.return_value = mock_container
|
||||||
|
|
||||||
|
result = docker_mw.launch(
|
||||||
|
flowgraph_path=str(fg_file),
|
||||||
|
name="test-auto",
|
||||||
|
xmlrpc_port=0,
|
||||||
|
)
|
||||||
|
# Auto-allocated port should be > 0 and not the default
|
||||||
|
assert result.xmlrpc_port > 0
|
||||||
|
|
||||||
|
def test_launch_occupied_port_raises(self, docker_mw, mock_docker_client, tmp_path):
|
||||||
|
"""Requesting a port that's already in use should raise PortConflictError."""
|
||||||
|
fg_file = tmp_path / "test.py"
|
||||||
|
fg_file.write_text(_SAMPLE_FG)
|
||||||
|
|
||||||
|
# Hold a port open
|
||||||
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||||
|
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||||
|
s.bind(("127.0.0.1", 0))
|
||||||
|
occupied_port = s.getsockname()[1]
|
||||||
|
|
||||||
|
with pytest.raises(PortConflictError, match="already in use"):
|
||||||
|
docker_mw.launch(
|
||||||
|
flowgraph_path=str(fg_file),
|
||||||
|
name="test-conflict",
|
||||||
|
xmlrpc_port=occupied_port,
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_launch_patches_mismatched_port(
|
||||||
|
self, docker_mw, mock_docker_client, tmp_path
|
||||||
|
):
|
||||||
|
"""When flowgraph has port 8080 but we request 9999, it should be patched."""
|
||||||
|
fg_file = tmp_path / "flowgraph.py"
|
||||||
|
fg_file.write_text(_SAMPLE_FG)
|
||||||
|
|
||||||
|
mock_container = MagicMock()
|
||||||
|
mock_container.id = "abc123def456"
|
||||||
|
mock_docker_client.containers.run.return_value = mock_container
|
||||||
|
|
||||||
|
# Use a port we know is free (mock is_port_available for determinism)
|
||||||
|
with patch(
|
||||||
|
"gnuradio_mcp.middlewares.docker.is_port_available", return_value=True
|
||||||
|
):
|
||||||
|
result = docker_mw.launch(
|
||||||
|
flowgraph_path=str(fg_file),
|
||||||
|
name="test-patch",
|
||||||
|
xmlrpc_port=9999,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert result.xmlrpc_port == 9999
|
||||||
|
|
||||||
|
# Original file should be unchanged
|
||||||
|
assert "8080" in fg_file.read_text()
|
||||||
|
|
||||||
|
def test_launch_no_patch_when_ports_match(
|
||||||
|
self, docker_mw, mock_docker_client, tmp_path
|
||||||
|
):
|
||||||
|
"""When flowgraph port matches requested port, no patching should occur."""
|
||||||
|
fg_file = tmp_path / "flowgraph.py"
|
||||||
|
fg_file.write_text(_SAMPLE_FG)
|
||||||
|
|
||||||
|
mock_container = MagicMock()
|
||||||
|
mock_container.id = "abc123def456"
|
||||||
|
mock_docker_client.containers.run.return_value = mock_container
|
||||||
|
|
||||||
|
with patch(
|
||||||
|
"gnuradio_mcp.middlewares.docker.is_port_available", return_value=True
|
||||||
|
):
|
||||||
|
result = docker_mw.launch(
|
||||||
|
flowgraph_path=str(fg_file),
|
||||||
|
name="test-match",
|
||||||
|
xmlrpc_port=8080,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert result.xmlrpc_port == 8080
|
||||||
|
# No patched files should exist (only the original)
|
||||||
|
py_files = list(tmp_path.glob("*.py"))
|
||||||
|
assert len(py_files) == 1
|
||||||
|
|||||||
124
tests/unit/test_ports.py
Normal file
124
tests/unit/test_ports.py
Normal file
@ -0,0 +1,124 @@
|
|||||||
|
"""Unit tests for port utilities."""
|
||||||
|
|
||||||
|
import socket
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from gnuradio_mcp.middlewares.ports import (
|
||||||
|
PortConflictError,
|
||||||
|
detect_xmlrpc_port,
|
||||||
|
find_free_port,
|
||||||
|
is_port_available,
|
||||||
|
patch_xmlrpc_port,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Sample flowgraph snippet matching what GRC actually generates
|
||||||
|
SAMPLE_FLOWGRAPH = """\
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
import sys
|
||||||
|
from xmlrpc.server import SimpleXMLRPCServer
|
||||||
|
|
||||||
|
class top_block:
|
||||||
|
def __init__(self):
|
||||||
|
self.xmlrpc_server_0 = SimpleXMLRPCServer(('localhost', 8080), allow_none=True)
|
||||||
|
self.xmlrpc_server_0.register_instance(self)
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
tb = top_block()
|
||||||
|
tb.xmlrpc_server_0.serve_forever()
|
||||||
|
"""
|
||||||
|
|
||||||
|
SAMPLE_NO_XMLRPC = """\
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
class top_block:
|
||||||
|
def __init__(self):
|
||||||
|
self.source = some_source()
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class TestIsPortAvailable:
|
||||||
|
def test_free_port_is_available(self):
|
||||||
|
# Get a port the OS says is free, then check our function agrees
|
||||||
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||||
|
s.bind(("", 0))
|
||||||
|
port = s.getsockname()[1]
|
||||||
|
# Socket is closed, port should be free
|
||||||
|
assert is_port_available(port) is True
|
||||||
|
|
||||||
|
def test_occupied_port_is_unavailable(self):
|
||||||
|
# Hold a port open and verify our function detects it
|
||||||
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||||
|
s.bind(("127.0.0.1", 0))
|
||||||
|
port = s.getsockname()[1]
|
||||||
|
s.listen(1)
|
||||||
|
# Socket still bound and listening — port is occupied
|
||||||
|
assert is_port_available(port) is False
|
||||||
|
|
||||||
|
|
||||||
|
class TestFindFreePort:
|
||||||
|
def test_returns_available_port(self):
|
||||||
|
port = find_free_port()
|
||||||
|
assert isinstance(port, int)
|
||||||
|
assert 1024 <= port <= 65535
|
||||||
|
|
||||||
|
def test_returned_port_is_usable(self):
|
||||||
|
port = find_free_port()
|
||||||
|
# Should be able to bind to it
|
||||||
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||||
|
s.bind(("127.0.0.1", port))
|
||||||
|
|
||||||
|
|
||||||
|
class TestDetectXmlrpcPort:
|
||||||
|
def test_detects_port(self, tmp_path):
|
||||||
|
fg = tmp_path / "flowgraph.py"
|
||||||
|
fg.write_text(SAMPLE_FLOWGRAPH)
|
||||||
|
assert detect_xmlrpc_port(fg) == 8080
|
||||||
|
|
||||||
|
def test_returns_none_when_missing(self, tmp_path):
|
||||||
|
fg = tmp_path / "no_xmlrpc.py"
|
||||||
|
fg.write_text(SAMPLE_NO_XMLRPC)
|
||||||
|
assert detect_xmlrpc_port(fg) is None
|
||||||
|
|
||||||
|
def test_detects_different_port(self, tmp_path):
|
||||||
|
fg = tmp_path / "custom.py"
|
||||||
|
fg.write_text(SAMPLE_FLOWGRAPH.replace("8080", "9999"))
|
||||||
|
assert detect_xmlrpc_port(fg) == 9999
|
||||||
|
|
||||||
|
|
||||||
|
class TestPatchXmlrpcPort:
|
||||||
|
def test_patches_port(self, tmp_path):
|
||||||
|
fg = tmp_path / "flowgraph.py"
|
||||||
|
fg.write_text(SAMPLE_FLOWGRAPH)
|
||||||
|
|
||||||
|
patched = patch_xmlrpc_port(fg, 12345)
|
||||||
|
content = patched.read_text()
|
||||||
|
assert "12345" in content
|
||||||
|
assert "8080" not in content
|
||||||
|
|
||||||
|
def test_preserves_original(self, tmp_path):
|
||||||
|
fg = tmp_path / "flowgraph.py"
|
||||||
|
fg.write_text(SAMPLE_FLOWGRAPH)
|
||||||
|
original_text = fg.read_text()
|
||||||
|
|
||||||
|
patch_xmlrpc_port(fg, 12345)
|
||||||
|
assert fg.read_text() == original_text
|
||||||
|
|
||||||
|
def test_patched_file_in_same_directory(self, tmp_path):
|
||||||
|
fg = tmp_path / "flowgraph.py"
|
||||||
|
fg.write_text(SAMPLE_FLOWGRAPH)
|
||||||
|
|
||||||
|
patched = patch_xmlrpc_port(fg, 12345)
|
||||||
|
assert patched.parent == fg.parent
|
||||||
|
|
||||||
|
def test_raises_on_no_match(self, tmp_path):
|
||||||
|
fg = tmp_path / "no_xmlrpc.py"
|
||||||
|
fg.write_text(SAMPLE_NO_XMLRPC)
|
||||||
|
|
||||||
|
with pytest.raises(ValueError, match="No SimpleXMLRPCServer"):
|
||||||
|
patch_xmlrpc_port(fg, 12345)
|
||||||
|
|
||||||
|
|
||||||
|
class TestPortConflictError:
|
||||||
|
def test_is_runtime_error(self):
|
||||||
|
err = PortConflictError("port 8080 in use")
|
||||||
|
assert isinstance(err, RuntimeError)
|
||||||
Loading…
x
Reference in New Issue
Block a user