diff --git a/docker/.coveragerc b/docker/.coveragerc
new file mode 100644
index 0000000..53355bd
--- /dev/null
+++ b/docker/.coveragerc
@@ -0,0 +1,60 @@
+# Coverage.py configuration for GNU Radio flowgraph execution
+# See: https://coverage.readthedocs.io/en/latest/config.html
+
+[run]
+# Source packages to measure coverage for
+source =
+ gnuradio
+ /flowgraphs
+
+# Include branch coverage for more detailed analysis
+branch = True
+
+# Enable parallel mode for combining coverage from multiple runs
+parallel = True
+
+# Omit test files and virtual environments from coverage
+omit =
+ */tests/*
+ */test_*
+ */qa_*
+ */.venv/*
+ */site-packages/*
+ */dist-packages/pybind11*
+
+# Dynamic context based on function names (useful for debugging)
+dynamic_context = test_function
+
+[paths]
+# Map container paths to host paths for combining coverage
+source =
+ /usr/lib/python3/dist-packages/gnuradio
+ /flowgraphs
+
+[report]
+# Exclude lines from coverage analysis
+exclude_lines =
+ pragma: no cover
+ def __repr__
+ raise NotImplementedError
+ if __name__ == .__main__.:
+ if TYPE_CHECKING:
+ @abstractmethod
+
+# Show missing line numbers in reports
+show_missing = True
+
+# Minimum coverage percentage (optional, for CI)
+# fail_under = 50
+
+[html]
+# HTML report output directory
+directory = /coverage/htmlcov
+
+[xml]
+# Cobertura XML output (for CI integration)
+output = /coverage/coverage.xml
+
+[json]
+# JSON output for programmatic access
+output = /coverage/coverage.json
diff --git a/docker/Dockerfile.gnuradio-coverage b/docker/Dockerfile.gnuradio-coverage
new file mode 100644
index 0000000..56fd359
--- /dev/null
+++ b/docker/Dockerfile.gnuradio-coverage
@@ -0,0 +1,39 @@
+# Coverage-enabled GNU Radio runtime container
+# Extends gnuradio-runtime with Python code coverage instrumentation
+#
+# Build:
+# docker build -f docker/Dockerfile.gnuradio-coverage -t gnuradio-coverage:latest docker/
+#
+# Usage:
+# docker run --rm -v $(pwd)/coverage:/coverage \
+# -e ENABLE_COVERAGE=1 \
+# gnuradio-coverage:latest python3 /flowgraphs/my_flowgraph.py
+#
+# The coverage data is written to /coverage/.coverage on container exit.
+# Mount this volume to persist coverage data between runs.
+
+FROM gnuradio-runtime:latest
+
+LABEL org.opencontainers.image.title="GNU Radio Coverage Runtime"
+LABEL org.opencontainers.image.description="GNU Radio runtime with Python code coverage support"
+
+# Install Python coverage.py via apt (pip3 not available in base image)
+RUN apt-get update && apt-get install -y --no-install-recommends \
+ python3-coverage \
+ && rm -rf /var/lib/apt/lists/*
+
+# Copy coverage configuration
+COPY .coveragerc /etc/coveragerc
+
+# Copy coverage-aware entrypoint
+COPY entrypoint-coverage.sh /entrypoint-coverage.sh
+RUN chmod +x /entrypoint-coverage.sh
+
+# Coverage data directory - mount a volume here to persist
+VOLUME /coverage
+
+# Environment variables for coverage
+ENV COVERAGE_FILE=/coverage/.coverage
+ENV COVERAGE_RCFILE=/etc/coveragerc
+
+ENTRYPOINT ["/entrypoint-coverage.sh"]
diff --git a/docker/entrypoint-coverage.sh b/docker/entrypoint-coverage.sh
new file mode 100644
index 0000000..fcb5d78
--- /dev/null
+++ b/docker/entrypoint-coverage.sh
@@ -0,0 +1,53 @@
+#!/bin/bash
+# Coverage-aware entrypoint for GNU Radio flowgraph execution
+#
+# When ENABLE_COVERAGE=1:
+# - Wraps the command with `coverage run`
+# - Writes coverage data to /coverage/.coverage
+# - Coverage can be collected after container stops
+#
+# When ENABLE_COVERAGE=0 (default):
+# - Behaves identically to the standard entrypoint
+#
+set -e
+
+# Start Xvfb for headless QT rendering
+Xvfb :99 -screen 0 1280x720x24 -ac +extension GLX +render -noreset &
+XVFB_PID=$!
+
+# Wait for Xvfb to be ready
+while ! xdpyinfo -display :99 >/dev/null 2>&1; do
+ sleep 0.1
+done
+echo "Xvfb ready on :99"
+
+# Optional VNC server for visual debugging
+if [ "${ENABLE_VNC:-0}" = "1" ]; then
+ x11vnc -display :99 -forever -nopw -shared -rfbport 5900 &
+ echo "VNC server on :5900"
+fi
+
+# Ensure coverage directory exists and is writable
+if [ "${ENABLE_COVERAGE:-0}" = "1" ]; then
+ mkdir -p /coverage
+ echo "Coverage enabled, data will be written to ${COVERAGE_FILE:-/coverage/.coverage}"
+fi
+
+# Run the flowgraph with or without coverage
+if [ "${ENABLE_COVERAGE:-0}" = "1" ]; then
+ # Use coverage run to instrument Python execution
+ # Note: apt installs as python3-coverage; use python3 -m coverage for flexibility
+ # --parallel-mode enables unique data files for parallel runs
+ # --source limits coverage to GNU Radio packages
+ #
+ # Strip 'python3' prefix if present (Docker middleware passes "python3 /script.py")
+ if [ "$1" = "python3" ] || [ "$1" = "python" ]; then
+ shift
+ fi
+ exec python3 -m coverage run \
+ --rcfile="${COVERAGE_RCFILE:-/etc/coveragerc}" \
+ --data-file="${COVERAGE_FILE:-/coverage/.coverage}" \
+ "$@"
+else
+ exec "$@"
+fi
diff --git a/src/gnuradio_mcp/middlewares/docker.py b/src/gnuradio_mcp/middlewares/docker.py
index aa3b0ae..414c63a 100644
--- a/src/gnuradio_mcp/middlewares/docker.py
+++ b/src/gnuradio_mcp/middlewares/docker.py
@@ -11,8 +11,12 @@ logger = logging.getLogger(__name__)
DEFAULT_XMLRPC_PORT = 8080
DEFAULT_VNC_PORT = 5900
+DEFAULT_STOP_TIMEOUT = 30 # Seconds to wait for graceful shutdown (coverage needs time)
RUNTIME_IMAGE = "gnuradio-runtime:latest"
+COVERAGE_IMAGE = "gnuradio-coverage:latest"
CONTAINER_FLOWGRAPH_DIR = "/flowgraphs"
+CONTAINER_COVERAGE_DIR = "/coverage"
+HOST_COVERAGE_BASE = "/tmp/gr-coverage"
class DockerMiddleware:
@@ -44,16 +48,31 @@ class DockerMiddleware:
name: str,
xmlrpc_port: int = DEFAULT_XMLRPC_PORT,
enable_vnc: bool = False,
+ enable_coverage: bool = False,
device_paths: list[str] | None = None,
) -> ContainerModel:
- """Launch a flowgraph in a Docker container with Xvfb."""
+ """Launch a flowgraph in a Docker container with Xvfb.
+
+ Args:
+ flowgraph_path: Path to the .py flowgraph file
+ name: Container name
+ xmlrpc_port: Port for XML-RPC variable control
+ enable_vnc: Enable VNC server for visual debugging
+ enable_coverage: Use coverage image and collect Python coverage data
+ device_paths: Host device paths to pass through (e.g., /dev/ttyUSB0)
+ """
fg_path = Path(flowgraph_path).resolve()
if not fg_path.exists():
raise FileNotFoundError(f"Flowgraph not found: {fg_path}")
+ # Select image based on coverage mode
+ image = COVERAGE_IMAGE if enable_coverage else RUNTIME_IMAGE
+
env = {"DISPLAY": ":99", "XMLRPC_PORT": str(xmlrpc_port)}
if enable_vnc:
env["ENABLE_VNC"] = "1"
+ if enable_coverage:
+ env["ENABLE_COVERAGE"] = "1"
ports: dict[str, int] = {f"{xmlrpc_port}/tcp": xmlrpc_port}
vnc_port: int | None = None
@@ -68,12 +87,21 @@ class DockerMiddleware:
}
}
+ # Mount coverage directory if coverage enabled
+ if enable_coverage:
+ coverage_dir = Path(HOST_COVERAGE_BASE) / name
+ coverage_dir.mkdir(parents=True, exist_ok=True)
+ volumes[str(coverage_dir)] = {
+ "bind": CONTAINER_COVERAGE_DIR,
+ "mode": "rw",
+ }
+
devices = [f"{d}:{d}:rwm" for d in (device_paths or [])]
container_fg_path = f"{CONTAINER_FLOWGRAPH_DIR}/{fg_path.name}"
container = self._client.containers.run(
- RUNTIME_IMAGE,
+ image,
command=["python3", container_fg_path],
name=name,
detach=True,
@@ -86,6 +114,7 @@ class DockerMiddleware:
"gr-mcp.flowgraph": str(fg_path),
"gr-mcp.xmlrpc-port": str(xmlrpc_port),
"gr-mcp.vnc-enabled": "1" if enable_vnc else "0",
+ "gr-mcp.coverage-enabled": "1" if enable_coverage else "0",
},
)
@@ -97,6 +126,7 @@ class DockerMiddleware:
xmlrpc_port=xmlrpc_port,
vnc_port=vnc_port,
device_paths=device_paths or [],
+ coverage_enabled=enable_coverage,
)
def list_containers(self) -> list[ContainerModel]:
@@ -117,14 +147,32 @@ class DockerMiddleware:
vnc_port=DEFAULT_VNC_PORT
if labels.get("gr-mcp.vnc-enabled") == "1" and c.status == "running"
else None,
+ coverage_enabled=labels.get("gr-mcp.coverage-enabled") == "1",
)
)
return result
- def stop(self, name: str) -> bool:
- """Stop a container by name."""
+ def stop(self, name: str, timeout: int = DEFAULT_STOP_TIMEOUT) -> bool:
+ """Stop a container gracefully with SIGTERM.
+
+ Uses a longer timeout (30s) to allow coverage data to be flushed.
+ Falls back to SIGKILL if container doesn't respond, but warns that
+ coverage data may be lost.
+
+ Args:
+ name: Container name
+ timeout: Seconds to wait for graceful shutdown before SIGKILL
+ """
container = self._client.containers.get(name)
- container.stop(timeout=10)
+ try:
+ container.stop(timeout=timeout)
+ except Exception as e:
+ # Timeout reached, container will be killed - coverage may be lost
+ logger.warning(
+ "Container %s didn't stop gracefully within %ds, "
+ "coverage data may be lost: %s",
+ name, timeout, e
+ )
return True
def remove(self, name: str, force: bool = False) -> bool:
@@ -163,3 +211,12 @@ class DockerMiddleware:
return int(
container.labels.get("gr-mcp.xmlrpc-port", DEFAULT_XMLRPC_PORT)
)
+
+ def is_coverage_enabled(self, name: str) -> bool:
+ """Check if coverage is enabled for a container."""
+ container = self._client.containers.get(name)
+ return container.labels.get("gr-mcp.coverage-enabled") == "1"
+
+ def get_coverage_dir(self, name: str) -> Path:
+ """Get the host-side coverage directory for a container."""
+ return Path(HOST_COVERAGE_BASE) / name
diff --git a/src/gnuradio_mcp/models.py b/src/gnuradio_mcp/models.py
index 96c33d6..c76d694 100644
--- a/src/gnuradio_mcp/models.py
+++ b/src/gnuradio_mcp/models.py
@@ -129,6 +129,7 @@ class ContainerModel(BaseModel):
xmlrpc_port: int
vnc_port: int | None = None
device_paths: list[str] = []
+ coverage_enabled: bool = False
class VariableModel(BaseModel):
@@ -155,3 +156,27 @@ class RuntimeStatusModel(BaseModel):
connected: bool
connection: ConnectionInfoModel | None = None
containers: list[ContainerModel] = []
+
+
+# ──────────────────────────────────────────────
+# Coverage Models (Cross-Process Code Coverage)
+# ──────────────────────────────────────────────
+
+
+class CoverageDataModel(BaseModel):
+ """Summary of collected coverage data."""
+
+ container_name: str
+ coverage_file: str
+ summary: str
+ lines_covered: int | None = None
+ lines_total: int | None = None
+ coverage_percent: float | None = None
+
+
+class CoverageReportModel(BaseModel):
+ """Generated coverage report (HTML, XML, JSON)."""
+
+ container_name: str
+ format: Literal["html", "xml", "json"]
+ report_path: str
diff --git a/src/gnuradio_mcp/providers/mcp_runtime.py b/src/gnuradio_mcp/providers/mcp_runtime.py
index 77fa6f5..b6997b0 100644
--- a/src/gnuradio_mcp/providers/mcp_runtime.py
+++ b/src/gnuradio_mcp/providers/mcp_runtime.py
@@ -44,14 +44,24 @@ class McpRuntimeProvider:
# Docker-dependent tools
if p._has_docker:
+ # Container lifecycle
self._mcp.tool(p.launch_flowgraph)
self._mcp.tool(p.list_containers)
self._mcp.tool(p.stop_flowgraph)
self._mcp.tool(p.remove_flowgraph)
self._mcp.tool(p.connect_to_container)
+
+ # Visual feedback
self._mcp.tool(p.capture_screenshot)
self._mcp.tool(p.get_container_logs)
- logger.info("Registered 17 runtime tools (Docker available)")
+
+ # Coverage collection
+ self._mcp.tool(p.collect_coverage)
+ self._mcp.tool(p.generate_coverage_report)
+ self._mcp.tool(p.combine_coverage)
+ self._mcp.tool(p.delete_coverage)
+
+ logger.info("Registered 21 runtime tools (Docker available)")
else:
logger.info(
"Registered 10 runtime tools (Docker unavailable, "
diff --git a/src/gnuradio_mcp/providers/runtime.py b/src/gnuradio_mcp/providers/runtime.py
index 05b1bdf..77bfa00 100644
--- a/src/gnuradio_mcp/providers/runtime.py
+++ b/src/gnuradio_mcp/providers/runtime.py
@@ -1,13 +1,19 @@
from __future__ import annotations
import logging
-from typing import Any
+import re
+import shutil
+import subprocess
+from pathlib import Path
+from typing import Any, Literal
-from gnuradio_mcp.middlewares.docker import DockerMiddleware
+from gnuradio_mcp.middlewares.docker import DockerMiddleware, HOST_COVERAGE_BASE
from gnuradio_mcp.middlewares.xmlrpc import XmlRpcMiddleware
from gnuradio_mcp.models import (
ConnectionInfoModel,
ContainerModel,
+ CoverageDataModel,
+ CoverageReportModel,
RuntimeStatusModel,
ScreenshotModel,
VariableModel,
@@ -62,19 +68,28 @@ class RuntimeProvider:
name: str | None = None,
xmlrpc_port: int = 8080,
enable_vnc: bool = False,
+ enable_coverage: bool = False,
device_paths: list[str] | None = None,
) -> ContainerModel:
- """Launch a flowgraph in a Docker container with Xvfb."""
+ """Launch a flowgraph in a Docker container with Xvfb.
+
+ Args:
+ flowgraph_path: Path to the .py flowgraph file
+ name: Container name (defaults to 'gr-{stem}')
+ xmlrpc_port: Port for XML-RPC variable control
+ enable_vnc: Enable VNC server for visual debugging
+ enable_coverage: Enable Python code coverage collection
+ device_paths: Host device paths to pass through
+ """
docker = self._require_docker()
if name is None:
- from pathlib import Path
-
name = f"gr-{Path(flowgraph_path).stem}"
return docker.launch(
flowgraph_path=flowgraph_path,
name=name,
xmlrpc_port=xmlrpc_port,
enable_vnc=enable_vnc,
+ enable_coverage=enable_coverage,
device_paths=device_paths,
)
@@ -214,3 +229,260 @@ class RuntimeProvider:
"No container specified. Provide a name or connect to a container first."
)
return docker.get_logs(container_name, tail=tail)
+
+ # ──────────────────────────────────────────
+ # Coverage Collection
+ # ──────────────────────────────────────────
+
+ def _get_coverage_dir(self, name: str) -> Path:
+ """Get coverage directory for a container, ensure it exists."""
+ coverage_dir = Path(HOST_COVERAGE_BASE) / name
+ if not coverage_dir.exists():
+ raise FileNotFoundError(
+ f"No coverage data for container '{name}'. "
+ f"Was it launched with enable_coverage=True?"
+ )
+ return coverage_dir
+
+ def _parse_coverage_summary(self, output: str) -> dict[str, int | float | None]:
+ """Parse coverage report output for metrics.
+
+ Example output:
+ Name Stmts Miss Branch BrPart Cover
+ ---------------------------------------------------------
+ gnuradio/__init__.py 10 2 4 1 75%
+ ...
+ TOTAL 100 25 40 10 70%
+ """
+ result: dict[str, int | float | None] = {
+ "lines_covered": None,
+ "lines_total": None,
+ "coverage_percent": None,
+ }
+ # Look for TOTAL line
+ match = re.search(
+ r"^TOTAL\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+)%",
+ output,
+ re.MULTILINE,
+ )
+ if match:
+ total_stmts = int(match.group(1))
+ miss_stmts = int(match.group(2))
+ result["lines_total"] = total_stmts
+ result["lines_covered"] = total_stmts - miss_stmts
+ result["coverage_percent"] = float(match.group(5))
+ return result
+
+ def collect_coverage(self, name: str) -> CoverageDataModel:
+ """Collect coverage data from a stopped container.
+
+ Combines any parallel coverage files and returns a summary.
+ Container must have been stopped (not removed) for coverage
+ data to be available.
+
+ Args:
+ name: Container name
+ """
+ coverage_dir = self._get_coverage_dir(name)
+
+ # First, combine any parallel files (idempotent if already combined)
+ # This handles both single-run and multi-run scenarios
+ subprocess.run(
+ ["coverage", "combine"],
+ cwd=coverage_dir,
+ capture_output=True,
+ )
+
+ coverage_file = coverage_dir / ".coverage"
+ if not coverage_file.exists():
+ # Check for parallel files that weren't combined
+ parallel_files = list(coverage_dir.glob(".coverage.*"))
+ if parallel_files:
+ raise RuntimeError(
+ f"Coverage combine failed. Found {len(parallel_files)} "
+ f"parallel files but no combined .coverage file."
+ )
+ raise FileNotFoundError(
+ f"No coverage data found in {coverage_dir}. "
+ f"Container may not have generated coverage data."
+ )
+
+ # Generate summary report
+ result = subprocess.run(
+ ["coverage", "report", "--data-file", str(coverage_file)],
+ capture_output=True,
+ text=True,
+ )
+
+ summary = result.stdout if result.returncode == 0 else result.stderr
+ metrics = self._parse_coverage_summary(summary)
+
+ return CoverageDataModel(
+ container_name=name,
+ coverage_file=str(coverage_file),
+ summary=summary,
+ lines_covered=metrics["lines_covered"],
+ lines_total=metrics["lines_total"],
+ coverage_percent=metrics["coverage_percent"],
+ )
+
+ def generate_coverage_report(
+ self,
+ name: str,
+ format: Literal["html", "xml", "json"] = "html",
+ ) -> CoverageReportModel:
+ """Generate a coverage report in the specified format.
+
+ Args:
+ name: Container name
+ format: Report format (html, xml, json)
+ """
+ coverage_dir = self._get_coverage_dir(name)
+ coverage_file = coverage_dir / ".coverage"
+
+ if not coverage_file.exists():
+ raise FileNotFoundError(
+ f"No combined coverage file for '{name}'. "
+ f"Call collect_coverage() first."
+ )
+
+ if format == "html":
+ report_path = coverage_dir / "htmlcov" / "index.html"
+ subprocess.run(
+ [
+ "coverage", "html",
+ "--data-file", str(coverage_file),
+ "-d", str(coverage_dir / "htmlcov"),
+ ],
+ capture_output=True,
+ check=True,
+ )
+ elif format == "xml":
+ report_path = coverage_dir / "coverage.xml"
+ subprocess.run(
+ [
+ "coverage", "xml",
+ "--data-file", str(coverage_file),
+ "-o", str(report_path),
+ ],
+ capture_output=True,
+ check=True,
+ )
+ elif format == "json":
+ report_path = coverage_dir / "coverage.json"
+ subprocess.run(
+ [
+ "coverage", "json",
+ "--data-file", str(coverage_file),
+ "-o", str(report_path),
+ ],
+ capture_output=True,
+ check=True,
+ )
+ else:
+ raise ValueError(f"Unsupported format: {format}")
+
+ return CoverageReportModel(
+ container_name=name,
+ format=format,
+ report_path=str(report_path),
+ )
+
+ def combine_coverage(self, names: list[str]) -> CoverageDataModel:
+ """Combine coverage data from multiple containers.
+
+ Useful for aggregating coverage across a test suite.
+
+ Args:
+ names: List of container names to combine
+ """
+ if not names:
+ raise ValueError("At least one container name required")
+
+ combined_dir = Path(HOST_COVERAGE_BASE) / "combined"
+ combined_dir.mkdir(parents=True, exist_ok=True)
+
+ # Clear any existing combined data
+ for f in combined_dir.glob(".coverage*"):
+ f.unlink()
+
+ # Copy all coverage files to combined directory
+ for name in names:
+ coverage_dir = self._get_coverage_dir(name)
+ for cov_file in coverage_dir.glob(".coverage*"):
+ # Ensure unique names when copying
+ dest_name = f".coverage.{name}.{cov_file.name}"
+ shutil.copy(cov_file, combined_dir / dest_name)
+
+ # Run coverage combine
+ subprocess.run(
+ ["coverage", "combine"],
+ cwd=combined_dir,
+ capture_output=True,
+ check=True,
+ )
+
+ # Generate summary
+ coverage_file = combined_dir / ".coverage"
+ result = subprocess.run(
+ ["coverage", "report", "--data-file", str(coverage_file)],
+ capture_output=True,
+ text=True,
+ )
+
+ summary = result.stdout if result.returncode == 0 else result.stderr
+ metrics = self._parse_coverage_summary(summary)
+
+ return CoverageDataModel(
+ container_name="combined",
+ coverage_file=str(coverage_file),
+ summary=summary,
+ lines_covered=metrics["lines_covered"],
+ lines_total=metrics["lines_total"],
+ coverage_percent=metrics["coverage_percent"],
+ )
+
+ def delete_coverage(
+ self,
+ name: str | None = None,
+ older_than_days: int | None = None,
+ ) -> int:
+ """Delete coverage data.
+
+ Args:
+ name: Delete specific container's coverage
+ older_than_days: Delete all coverage older than N days
+
+ Returns:
+ Number of coverage directories deleted
+ """
+ import time
+
+ deleted = 0
+ coverage_base = Path(HOST_COVERAGE_BASE)
+
+ if not coverage_base.exists():
+ return 0
+
+ if name is not None:
+ # Delete specific container's coverage
+ coverage_dir = coverage_base / name
+ if coverage_dir.exists():
+ shutil.rmtree(coverage_dir)
+ deleted += 1
+ elif older_than_days is not None:
+ # Delete coverage older than N days
+ cutoff = time.time() - (older_than_days * 86400)
+ for coverage_dir in coverage_base.iterdir():
+ if coverage_dir.is_dir():
+ if coverage_dir.stat().st_mtime < cutoff:
+ shutil.rmtree(coverage_dir)
+ deleted += 1
+ else:
+ # No filter - delete all
+ for coverage_dir in coverage_base.iterdir():
+ if coverage_dir.is_dir():
+ shutil.rmtree(coverage_dir)
+ deleted += 1
+
+ return deleted
diff --git a/tests/unit/test_docker_middleware.py b/tests/unit/test_docker_middleware.py
index 205e415..f4c18f8 100644
--- a/tests/unit/test_docker_middleware.py
+++ b/tests/unit/test_docker_middleware.py
@@ -180,7 +180,8 @@ class TestStopRemove:
mock_container = MagicMock()
mock_docker_client.containers.get.return_value = mock_container
assert docker_mw.stop("test") is True
- mock_container.stop.assert_called_once_with(timeout=10)
+ # Default timeout is 30s for graceful shutdown (coverage needs time)
+ mock_container.stop.assert_called_once_with(timeout=30)
def test_remove(self, docker_mw, mock_docker_client):
mock_container = MagicMock()
@@ -242,3 +243,142 @@ class TestGetXmlRpcPort:
mock_docker_client.containers.get.return_value = mock_container
assert docker_mw.get_xmlrpc_port("test") == DEFAULT_XMLRPC_PORT
+
+
+class TestCoverage:
+ def test_launch_with_coverage_uses_coverage_image(
+ self, docker_mw, mock_docker_client, tmp_path
+ ):
+ from gnuradio_mcp.middlewares.docker import COVERAGE_IMAGE, RUNTIME_IMAGE
+
+ fg_file = tmp_path / "test.grc"
+ fg_file.write_text("")
+
+ mock_container = MagicMock()
+ mock_container.id = "abc123def456"
+ mock_docker_client.containers.run.return_value = mock_container
+
+ # Without coverage
+ docker_mw.launch(str(fg_file), "test-no-cov", enable_coverage=False)
+ call_args = mock_docker_client.containers.run.call_args
+ assert call_args.args[0] == RUNTIME_IMAGE
+
+ mock_docker_client.reset_mock()
+
+ # With coverage
+ docker_mw.launch(str(fg_file), "test-with-cov", enable_coverage=True)
+ call_args = mock_docker_client.containers.run.call_args
+ assert call_args.args[0] == COVERAGE_IMAGE
+
+ def test_launch_with_coverage_sets_env_and_label(
+ self, docker_mw, mock_docker_client, tmp_path
+ ):
+ fg_file = tmp_path / "test.grc"
+ fg_file.write_text("")
+
+ mock_container = MagicMock()
+ mock_container.id = "abc123def456"
+ mock_docker_client.containers.run.return_value = mock_container
+
+ result = docker_mw.launch(str(fg_file), "test-cov", enable_coverage=True)
+
+ call_kwargs = mock_docker_client.containers.run.call_args.kwargs
+ assert call_kwargs["environment"]["ENABLE_COVERAGE"] == "1"
+ assert call_kwargs["labels"]["gr-mcp.coverage-enabled"] == "1"
+ assert result.coverage_enabled is True
+
+ def test_launch_with_coverage_mounts_coverage_dir(
+ self, docker_mw, mock_docker_client, tmp_path
+ ):
+ from gnuradio_mcp.middlewares.docker import (
+ CONTAINER_COVERAGE_DIR,
+ HOST_COVERAGE_BASE,
+ )
+
+ fg_file = tmp_path / "test.grc"
+ fg_file.write_text("")
+
+ mock_container = MagicMock()
+ mock_container.id = "abc123def456"
+ mock_docker_client.containers.run.return_value = mock_container
+
+ docker_mw.launch(str(fg_file), "test-cov-mount", enable_coverage=True)
+
+ call_kwargs = mock_docker_client.containers.run.call_args.kwargs
+ volumes = call_kwargs["volumes"]
+ # Coverage directory should be mounted
+ coverage_host_path = f"{HOST_COVERAGE_BASE}/test-cov-mount"
+ assert coverage_host_path in volumes
+ assert volumes[coverage_host_path]["bind"] == CONTAINER_COVERAGE_DIR
+ assert volumes[coverage_host_path]["mode"] == "rw"
+
+ def test_list_containers_includes_coverage_enabled(
+ self, docker_mw, mock_docker_client
+ ):
+ mock_container_cov = MagicMock()
+ mock_container_cov.name = "with-cov"
+ mock_container_cov.id = "aaa111"
+ mock_container_cov.status = "running"
+ mock_container_cov.labels = {
+ "gr-mcp.flowgraph": "/test.grc",
+ "gr-mcp.xmlrpc-port": "8080",
+ "gr-mcp.vnc-enabled": "0",
+ "gr-mcp.coverage-enabled": "1",
+ }
+
+ mock_container_no_cov = MagicMock()
+ mock_container_no_cov.name = "no-cov"
+ mock_container_no_cov.id = "bbb222"
+ mock_container_no_cov.status = "running"
+ mock_container_no_cov.labels = {
+ "gr-mcp.flowgraph": "/test2.grc",
+ "gr-mcp.xmlrpc-port": "8081",
+ "gr-mcp.vnc-enabled": "0",
+ "gr-mcp.coverage-enabled": "0",
+ }
+
+ mock_docker_client.containers.list.return_value = [
+ mock_container_cov,
+ mock_container_no_cov,
+ ]
+
+ result = docker_mw.list_containers()
+ assert len(result) == 2
+ assert result[0].coverage_enabled is True
+ assert result[1].coverage_enabled is False
+
+ def test_is_coverage_enabled(self, docker_mw, mock_docker_client):
+ mock_container = MagicMock()
+ mock_container.labels = {"gr-mcp.coverage-enabled": "1"}
+ mock_docker_client.containers.get.return_value = mock_container
+
+ assert docker_mw.is_coverage_enabled("test") is True
+
+ mock_container.labels = {"gr-mcp.coverage-enabled": "0"}
+ assert docker_mw.is_coverage_enabled("test") is False
+
+ mock_container.labels = {}
+ assert docker_mw.is_coverage_enabled("test") is False
+
+ def test_get_coverage_dir(self, docker_mw):
+ from pathlib import Path
+
+ from gnuradio_mcp.middlewares.docker import HOST_COVERAGE_BASE
+
+ result = docker_mw.get_coverage_dir("my-container")
+ expected = Path(HOST_COVERAGE_BASE) / "my-container"
+ assert result == expected
+
+ def test_stop_with_timeout_warning(self, docker_mw, mock_docker_client, caplog):
+ import logging
+
+ mock_container = MagicMock()
+ mock_container.stop.side_effect = Exception("Timeout waiting for container")
+ mock_docker_client.containers.get.return_value = mock_container
+
+ with caplog.at_level(logging.WARNING):
+ result = docker_mw.stop("test")
+
+ # Should still return True (container will be killed)
+ assert result is True
+ assert "didn't stop gracefully" in caplog.text
diff --git a/tests/unit/test_runtime_provider.py b/tests/unit/test_runtime_provider.py
index 37264c2..0df5963 100644
--- a/tests/unit/test_runtime_provider.py
+++ b/tests/unit/test_runtime_provider.py
@@ -125,6 +125,7 @@ class TestContainerLifecycle:
name="my-fg",
xmlrpc_port=9090,
enable_vnc=True,
+ enable_coverage=False,
device_paths=None,
)
@@ -324,3 +325,251 @@ class TestVisualFeedback:
def test_get_container_logs_requires_container(self, provider_with_docker):
with pytest.raises(RuntimeError, match="No container specified"):
provider_with_docker.get_container_logs()
+
+
+class TestCoverageCollection:
+ """Tests for coverage collection methods."""
+
+ def test_launch_with_coverage(self, provider_with_docker, mock_docker_mw, tmp_path):
+ fg = tmp_path / "test.grc"
+ fg.write_text("")
+
+ provider_with_docker.launch_flowgraph(
+ flowgraph_path=str(fg),
+ name="cov-test",
+ enable_coverage=True,
+ )
+
+ mock_docker_mw.launch.assert_called_once()
+ call_kwargs = mock_docker_mw.launch.call_args.kwargs
+ assert call_kwargs["enable_coverage"] is True
+
+ def test_collect_coverage_no_data(self, provider_with_docker):
+ with pytest.raises(FileNotFoundError, match="No coverage data"):
+ provider_with_docker.collect_coverage("nonexistent-container")
+
+ def test_collect_coverage_success(self, provider_with_docker, tmp_path, monkeypatch):
+ from gnuradio_mcp.models import CoverageDataModel
+ from gnuradio_mcp.middlewares.docker import HOST_COVERAGE_BASE
+
+ # Create fake coverage directory and file
+ monkeypatch.setattr(
+ "gnuradio_mcp.providers.runtime.HOST_COVERAGE_BASE", str(tmp_path)
+ )
+ coverage_dir = tmp_path / "test-container"
+ coverage_dir.mkdir()
+ (coverage_dir / ".coverage").write_bytes(b"fake coverage data")
+
+ # Mock subprocess to return fake coverage report
+ def mock_run(cmd, **kwargs):
+ class FakeResult:
+ stdout = """Name Stmts Miss Branch BrPart Cover
+-----------------------------------------------
+module.py 100 20 40 10 75%
+-----------------------------------------------
+TOTAL 100 20 40 10 75%"""
+ stderr = ""
+ returncode = 0
+
+ return FakeResult()
+
+ monkeypatch.setattr("subprocess.run", mock_run)
+
+ result = provider_with_docker.collect_coverage("test-container")
+
+ assert isinstance(result, CoverageDataModel)
+ assert result.container_name == "test-container"
+ assert result.coverage_percent == 75.0
+ assert result.lines_total == 100
+ assert result.lines_covered == 80 # 100 - 20 missed
+
+ def test_generate_coverage_report_html(self, provider_with_docker, tmp_path, monkeypatch):
+ from gnuradio_mcp.models import CoverageReportModel
+
+ # Setup
+ monkeypatch.setattr(
+ "gnuradio_mcp.providers.runtime.HOST_COVERAGE_BASE", str(tmp_path)
+ )
+ coverage_dir = tmp_path / "test-container"
+ coverage_dir.mkdir()
+ (coverage_dir / ".coverage").write_bytes(b"fake coverage data")
+
+ # Mock subprocess
+ def mock_run(cmd, **kwargs):
+ class FakeResult:
+ returncode = 0
+
+ # Create output file for HTML
+ if "html" in cmd:
+ html_dir = coverage_dir / "htmlcov"
+ html_dir.mkdir(exist_ok=True)
+ (html_dir / "index.html").write_text("Coverage")
+ return FakeResult()
+
+ monkeypatch.setattr("subprocess.run", mock_run)
+
+ result = provider_with_docker.generate_coverage_report("test-container", "html")
+
+ assert isinstance(result, CoverageReportModel)
+ assert result.format == "html"
+ assert "htmlcov" in result.report_path
+
+ def test_generate_coverage_report_xml(self, provider_with_docker, tmp_path, monkeypatch):
+ from gnuradio_mcp.models import CoverageReportModel
+
+ monkeypatch.setattr(
+ "gnuradio_mcp.providers.runtime.HOST_COVERAGE_BASE", str(tmp_path)
+ )
+ coverage_dir = tmp_path / "test-container"
+ coverage_dir.mkdir()
+ (coverage_dir / ".coverage").write_bytes(b"fake coverage data")
+
+ def mock_run(cmd, **kwargs):
+ class FakeResult:
+ returncode = 0
+
+ return FakeResult()
+
+ monkeypatch.setattr("subprocess.run", mock_run)
+
+ result = provider_with_docker.generate_coverage_report("test-container", "xml")
+
+ assert isinstance(result, CoverageReportModel)
+ assert result.format == "xml"
+ assert "coverage.xml" in result.report_path
+
+ def test_generate_coverage_report_requires_coverage_file(
+ self, provider_with_docker, tmp_path, monkeypatch
+ ):
+ monkeypatch.setattr(
+ "gnuradio_mcp.providers.runtime.HOST_COVERAGE_BASE", str(tmp_path)
+ )
+ coverage_dir = tmp_path / "test-container"
+ coverage_dir.mkdir()
+ # No .coverage file
+
+ with pytest.raises(FileNotFoundError, match="No combined coverage file"):
+ provider_with_docker.generate_coverage_report("test-container", "html")
+
+ def test_combine_coverage(self, provider_with_docker, tmp_path, monkeypatch):
+ from gnuradio_mcp.models import CoverageDataModel
+
+ monkeypatch.setattr(
+ "gnuradio_mcp.providers.runtime.HOST_COVERAGE_BASE", str(tmp_path)
+ )
+
+ # Create two containers with coverage data
+ for name in ["container-1", "container-2"]:
+ coverage_dir = tmp_path / name
+ coverage_dir.mkdir()
+ (coverage_dir / ".coverage").write_bytes(b"fake coverage")
+
+ def mock_run(cmd, **kwargs):
+ class FakeResult:
+ stdout = "TOTAL 200 40 80 20 75%"
+ stderr = ""
+ returncode = 0
+
+ # Create combined coverage file
+ if "combine" in cmd:
+ combined_dir = tmp_path / "combined"
+ combined_dir.mkdir(exist_ok=True)
+ (combined_dir / ".coverage").write_bytes(b"combined data")
+ return FakeResult()
+
+ monkeypatch.setattr("subprocess.run", mock_run)
+
+ result = provider_with_docker.combine_coverage(["container-1", "container-2"])
+
+ assert isinstance(result, CoverageDataModel)
+ assert result.container_name == "combined"
+
+ def test_combine_coverage_requires_names(self, provider_with_docker):
+ with pytest.raises(ValueError, match="At least one container"):
+ provider_with_docker.combine_coverage([])
+
+ def test_delete_coverage_specific(self, provider_with_docker, tmp_path, monkeypatch):
+ monkeypatch.setattr(
+ "gnuradio_mcp.providers.runtime.HOST_COVERAGE_BASE", str(tmp_path)
+ )
+
+ # Create coverage directory
+ coverage_dir = tmp_path / "test-container"
+ coverage_dir.mkdir()
+ (coverage_dir / ".coverage").write_bytes(b"data")
+
+ deleted = provider_with_docker.delete_coverage(name="test-container")
+
+ assert deleted == 1
+ assert not coverage_dir.exists()
+
+ def test_delete_coverage_older_than(self, provider_with_docker, tmp_path, monkeypatch):
+ import os
+ import time
+
+ monkeypatch.setattr(
+ "gnuradio_mcp.providers.runtime.HOST_COVERAGE_BASE", str(tmp_path)
+ )
+
+ # Create old and new coverage directories
+ old_dir = tmp_path / "old-container"
+ old_dir.mkdir()
+ # Set mtime to 10 days ago
+ old_time = time.time() - (10 * 86400)
+ os.utime(old_dir, (old_time, old_time))
+
+ new_dir = tmp_path / "new-container"
+ new_dir.mkdir()
+
+ deleted = provider_with_docker.delete_coverage(older_than_days=7)
+
+ assert deleted == 1
+ assert not old_dir.exists()
+ assert new_dir.exists()
+
+ def test_delete_coverage_all(self, provider_with_docker, tmp_path, monkeypatch):
+ monkeypatch.setattr(
+ "gnuradio_mcp.providers.runtime.HOST_COVERAGE_BASE", str(tmp_path)
+ )
+
+ # Create multiple directories
+ (tmp_path / "container-1").mkdir()
+ (tmp_path / "container-2").mkdir()
+
+ deleted = provider_with_docker.delete_coverage()
+
+ assert deleted == 2
+ assert not (tmp_path / "container-1").exists()
+ assert not (tmp_path / "container-2").exists()
+
+ def test_delete_coverage_nonexistent(self, provider_with_docker, tmp_path, monkeypatch):
+ monkeypatch.setattr(
+ "gnuradio_mcp.providers.runtime.HOST_COVERAGE_BASE", str(tmp_path)
+ )
+ # tmp_path exists but is empty
+
+ deleted = provider_with_docker.delete_coverage(name="nonexistent")
+ assert deleted == 0
+
+ def test_parse_coverage_summary(self, provider_with_docker):
+ summary = """Name Stmts Miss Branch BrPart Cover
+-----------------------------------------------
+module.py 150 30 60 15 80%
+other.py 50 20 20 5 60%
+-----------------------------------------------
+TOTAL 200 50 80 20 75%"""
+
+ metrics = provider_with_docker._parse_coverage_summary(summary)
+
+ assert metrics["lines_total"] == 200
+ assert metrics["lines_covered"] == 150 # 200 - 50 missed
+ assert metrics["coverage_percent"] == 75.0
+
+ def test_parse_coverage_summary_no_total(self, provider_with_docker):
+ summary = "No coverage data collected"
+
+ metrics = provider_with_docker._parse_coverage_summary(summary)
+
+ assert metrics["lines_total"] is None
+ assert metrics["lines_covered"] is None
+ assert metrics["coverage_percent"] is None