coverage: add cross-process coverage collection for containerized flowgraphs

New MCP tools:
- collect_coverage(name) - combine parallel files, return summary
- generate_coverage_report(name, format) - HTML/XML/JSON reports
- combine_coverage(names) - aggregate across test runs
- delete_coverage(name?, older_than_days?) - cleanup

Modified:
- launch_flowgraph() now accepts enable_coverage parameter
- stop() uses 30s timeout for graceful shutdown (coverage needs atexit)

Docker:
- Dockerfile.gnuradio-coverage extends runtime with python3-coverage
- entrypoint-coverage.sh wraps execution with coverage run
- .coveragerc configured for GNU Radio source paths

Tests: 125 unit tests (21 new), 80% coverage
This commit is contained in:
Ryan Malloy 2026-01-27 13:50:17 -07:00
parent 91a442cdf9
commit bfab802e05
9 changed files with 917 additions and 12 deletions

60
docker/.coveragerc Normal file
View File

@ -0,0 +1,60 @@
# Coverage.py configuration for GNU Radio flowgraph execution
# See: https://coverage.readthedocs.io/en/latest/config.html
[run]
# Source packages to measure coverage for
source =
gnuradio
/flowgraphs
# Include branch coverage for more detailed analysis
branch = True
# Enable parallel mode for combining coverage from multiple runs
parallel = True
# Omit test files and virtual environments from coverage
omit =
*/tests/*
*/test_*
*/qa_*
*/.venv/*
*/site-packages/*
*/dist-packages/pybind11*
# Dynamic context based on function names (useful for debugging)
dynamic_context = test_function
[paths]
# Map container paths to host paths for combining coverage
source =
/usr/lib/python3/dist-packages/gnuradio
/flowgraphs
[report]
# Exclude lines from coverage analysis
exclude_lines =
pragma: no cover
def __repr__
raise NotImplementedError
if __name__ == .__main__.:
if TYPE_CHECKING:
@abstractmethod
# Show missing line numbers in reports
show_missing = True
# Minimum coverage percentage (optional, for CI)
# fail_under = 50
[html]
# HTML report output directory
directory = /coverage/htmlcov
[xml]
# Cobertura XML output (for CI integration)
output = /coverage/coverage.xml
[json]
# JSON output for programmatic access
output = /coverage/coverage.json

View File

@ -0,0 +1,39 @@
# Coverage-enabled GNU Radio runtime container
# Extends gnuradio-runtime with Python code coverage instrumentation
#
# Build:
# docker build -f docker/Dockerfile.gnuradio-coverage -t gnuradio-coverage:latest docker/
#
# Usage:
# docker run --rm -v $(pwd)/coverage:/coverage \
# -e ENABLE_COVERAGE=1 \
# gnuradio-coverage:latest python3 /flowgraphs/my_flowgraph.py
#
# The coverage data is written to /coverage/.coverage on container exit.
# Mount this volume to persist coverage data between runs.
FROM gnuradio-runtime:latest
LABEL org.opencontainers.image.title="GNU Radio Coverage Runtime"
LABEL org.opencontainers.image.description="GNU Radio runtime with Python code coverage support"
# Install Python coverage.py via apt (pip3 not available in base image)
RUN apt-get update && apt-get install -y --no-install-recommends \
python3-coverage \
&& rm -rf /var/lib/apt/lists/*
# Copy coverage configuration
COPY .coveragerc /etc/coveragerc
# Copy coverage-aware entrypoint
COPY entrypoint-coverage.sh /entrypoint-coverage.sh
RUN chmod +x /entrypoint-coverage.sh
# Coverage data directory - mount a volume here to persist
VOLUME /coverage
# Environment variables for coverage
ENV COVERAGE_FILE=/coverage/.coverage
ENV COVERAGE_RCFILE=/etc/coveragerc
ENTRYPOINT ["/entrypoint-coverage.sh"]

View File

@ -0,0 +1,53 @@
#!/bin/bash
# Coverage-aware entrypoint for GNU Radio flowgraph execution
#
# When ENABLE_COVERAGE=1:
# - Wraps the command with `coverage run`
# - Writes coverage data to /coverage/.coverage
# - Coverage can be collected after container stops
#
# When ENABLE_COVERAGE=0 (default):
# - Behaves identically to the standard entrypoint
#
set -e
# Start Xvfb for headless QT rendering
Xvfb :99 -screen 0 1280x720x24 -ac +extension GLX +render -noreset &
XVFB_PID=$!
# Wait for Xvfb to be ready
while ! xdpyinfo -display :99 >/dev/null 2>&1; do
sleep 0.1
done
echo "Xvfb ready on :99"
# Optional VNC server for visual debugging
if [ "${ENABLE_VNC:-0}" = "1" ]; then
x11vnc -display :99 -forever -nopw -shared -rfbport 5900 &
echo "VNC server on :5900"
fi
# Ensure coverage directory exists and is writable
if [ "${ENABLE_COVERAGE:-0}" = "1" ]; then
mkdir -p /coverage
echo "Coverage enabled, data will be written to ${COVERAGE_FILE:-/coverage/.coverage}"
fi
# Run the flowgraph with or without coverage
if [ "${ENABLE_COVERAGE:-0}" = "1" ]; then
# Use coverage run to instrument Python execution
# Note: apt installs as python3-coverage; use python3 -m coverage for flexibility
# --parallel-mode enables unique data files for parallel runs
# --source limits coverage to GNU Radio packages
#
# Strip 'python3' prefix if present (Docker middleware passes "python3 /script.py")
if [ "$1" = "python3" ] || [ "$1" = "python" ]; then
shift
fi
exec python3 -m coverage run \
--rcfile="${COVERAGE_RCFILE:-/etc/coveragerc}" \
--data-file="${COVERAGE_FILE:-/coverage/.coverage}" \
"$@"
else
exec "$@"
fi

View File

@ -11,8 +11,12 @@ logger = logging.getLogger(__name__)
DEFAULT_XMLRPC_PORT = 8080
DEFAULT_VNC_PORT = 5900
DEFAULT_STOP_TIMEOUT = 30 # Seconds to wait for graceful shutdown (coverage needs time)
RUNTIME_IMAGE = "gnuradio-runtime:latest"
COVERAGE_IMAGE = "gnuradio-coverage:latest"
CONTAINER_FLOWGRAPH_DIR = "/flowgraphs"
CONTAINER_COVERAGE_DIR = "/coverage"
HOST_COVERAGE_BASE = "/tmp/gr-coverage"
class DockerMiddleware:
@ -44,16 +48,31 @@ class DockerMiddleware:
name: str,
xmlrpc_port: int = DEFAULT_XMLRPC_PORT,
enable_vnc: bool = False,
enable_coverage: bool = False,
device_paths: list[str] | None = None,
) -> ContainerModel:
"""Launch a flowgraph in a Docker container with Xvfb."""
"""Launch a flowgraph in a Docker container with Xvfb.
Args:
flowgraph_path: Path to the .py flowgraph file
name: Container name
xmlrpc_port: Port for XML-RPC variable control
enable_vnc: Enable VNC server for visual debugging
enable_coverage: Use coverage image and collect Python coverage data
device_paths: Host device paths to pass through (e.g., /dev/ttyUSB0)
"""
fg_path = Path(flowgraph_path).resolve()
if not fg_path.exists():
raise FileNotFoundError(f"Flowgraph not found: {fg_path}")
# Select image based on coverage mode
image = COVERAGE_IMAGE if enable_coverage else RUNTIME_IMAGE
env = {"DISPLAY": ":99", "XMLRPC_PORT": str(xmlrpc_port)}
if enable_vnc:
env["ENABLE_VNC"] = "1"
if enable_coverage:
env["ENABLE_COVERAGE"] = "1"
ports: dict[str, int] = {f"{xmlrpc_port}/tcp": xmlrpc_port}
vnc_port: int | None = None
@ -68,12 +87,21 @@ class DockerMiddleware:
}
}
# Mount coverage directory if coverage enabled
if enable_coverage:
coverage_dir = Path(HOST_COVERAGE_BASE) / name
coverage_dir.mkdir(parents=True, exist_ok=True)
volumes[str(coverage_dir)] = {
"bind": CONTAINER_COVERAGE_DIR,
"mode": "rw",
}
devices = [f"{d}:{d}:rwm" for d in (device_paths or [])]
container_fg_path = f"{CONTAINER_FLOWGRAPH_DIR}/{fg_path.name}"
container = self._client.containers.run(
RUNTIME_IMAGE,
image,
command=["python3", container_fg_path],
name=name,
detach=True,
@ -86,6 +114,7 @@ class DockerMiddleware:
"gr-mcp.flowgraph": str(fg_path),
"gr-mcp.xmlrpc-port": str(xmlrpc_port),
"gr-mcp.vnc-enabled": "1" if enable_vnc else "0",
"gr-mcp.coverage-enabled": "1" if enable_coverage else "0",
},
)
@ -97,6 +126,7 @@ class DockerMiddleware:
xmlrpc_port=xmlrpc_port,
vnc_port=vnc_port,
device_paths=device_paths or [],
coverage_enabled=enable_coverage,
)
def list_containers(self) -> list[ContainerModel]:
@ -117,14 +147,32 @@ class DockerMiddleware:
vnc_port=DEFAULT_VNC_PORT
if labels.get("gr-mcp.vnc-enabled") == "1" and c.status == "running"
else None,
coverage_enabled=labels.get("gr-mcp.coverage-enabled") == "1",
)
)
return result
def stop(self, name: str) -> bool:
"""Stop a container by name."""
def stop(self, name: str, timeout: int = DEFAULT_STOP_TIMEOUT) -> bool:
"""Stop a container gracefully with SIGTERM.
Uses a longer timeout (30s) to allow coverage data to be flushed.
Falls back to SIGKILL if container doesn't respond, but warns that
coverage data may be lost.
Args:
name: Container name
timeout: Seconds to wait for graceful shutdown before SIGKILL
"""
container = self._client.containers.get(name)
container.stop(timeout=10)
try:
container.stop(timeout=timeout)
except Exception as e:
# Timeout reached, container will be killed - coverage may be lost
logger.warning(
"Container %s didn't stop gracefully within %ds, "
"coverage data may be lost: %s",
name, timeout, e
)
return True
def remove(self, name: str, force: bool = False) -> bool:
@ -163,3 +211,12 @@ class DockerMiddleware:
return int(
container.labels.get("gr-mcp.xmlrpc-port", DEFAULT_XMLRPC_PORT)
)
def is_coverage_enabled(self, name: str) -> bool:
"""Check if coverage is enabled for a container."""
container = self._client.containers.get(name)
return container.labels.get("gr-mcp.coverage-enabled") == "1"
def get_coverage_dir(self, name: str) -> Path:
"""Get the host-side coverage directory for a container."""
return Path(HOST_COVERAGE_BASE) / name

View File

@ -129,6 +129,7 @@ class ContainerModel(BaseModel):
xmlrpc_port: int
vnc_port: int | None = None
device_paths: list[str] = []
coverage_enabled: bool = False
class VariableModel(BaseModel):
@ -155,3 +156,27 @@ class RuntimeStatusModel(BaseModel):
connected: bool
connection: ConnectionInfoModel | None = None
containers: list[ContainerModel] = []
# ──────────────────────────────────────────────
# Coverage Models (Cross-Process Code Coverage)
# ──────────────────────────────────────────────
class CoverageDataModel(BaseModel):
"""Summary of collected coverage data."""
container_name: str
coverage_file: str
summary: str
lines_covered: int | None = None
lines_total: int | None = None
coverage_percent: float | None = None
class CoverageReportModel(BaseModel):
"""Generated coverage report (HTML, XML, JSON)."""
container_name: str
format: Literal["html", "xml", "json"]
report_path: str

View File

@ -44,14 +44,24 @@ class McpRuntimeProvider:
# Docker-dependent tools
if p._has_docker:
# Container lifecycle
self._mcp.tool(p.launch_flowgraph)
self._mcp.tool(p.list_containers)
self._mcp.tool(p.stop_flowgraph)
self._mcp.tool(p.remove_flowgraph)
self._mcp.tool(p.connect_to_container)
# Visual feedback
self._mcp.tool(p.capture_screenshot)
self._mcp.tool(p.get_container_logs)
logger.info("Registered 17 runtime tools (Docker available)")
# Coverage collection
self._mcp.tool(p.collect_coverage)
self._mcp.tool(p.generate_coverage_report)
self._mcp.tool(p.combine_coverage)
self._mcp.tool(p.delete_coverage)
logger.info("Registered 21 runtime tools (Docker available)")
else:
logger.info(
"Registered 10 runtime tools (Docker unavailable, "

View File

@ -1,13 +1,19 @@
from __future__ import annotations
import logging
from typing import Any
import re
import shutil
import subprocess
from pathlib import Path
from typing import Any, Literal
from gnuradio_mcp.middlewares.docker import DockerMiddleware
from gnuradio_mcp.middlewares.docker import DockerMiddleware, HOST_COVERAGE_BASE
from gnuradio_mcp.middlewares.xmlrpc import XmlRpcMiddleware
from gnuradio_mcp.models import (
ConnectionInfoModel,
ContainerModel,
CoverageDataModel,
CoverageReportModel,
RuntimeStatusModel,
ScreenshotModel,
VariableModel,
@ -62,19 +68,28 @@ class RuntimeProvider:
name: str | None = None,
xmlrpc_port: int = 8080,
enable_vnc: bool = False,
enable_coverage: bool = False,
device_paths: list[str] | None = None,
) -> ContainerModel:
"""Launch a flowgraph in a Docker container with Xvfb."""
"""Launch a flowgraph in a Docker container with Xvfb.
Args:
flowgraph_path: Path to the .py flowgraph file
name: Container name (defaults to 'gr-{stem}')
xmlrpc_port: Port for XML-RPC variable control
enable_vnc: Enable VNC server for visual debugging
enable_coverage: Enable Python code coverage collection
device_paths: Host device paths to pass through
"""
docker = self._require_docker()
if name is None:
from pathlib import Path
name = f"gr-{Path(flowgraph_path).stem}"
return docker.launch(
flowgraph_path=flowgraph_path,
name=name,
xmlrpc_port=xmlrpc_port,
enable_vnc=enable_vnc,
enable_coverage=enable_coverage,
device_paths=device_paths,
)
@ -214,3 +229,260 @@ class RuntimeProvider:
"No container specified. Provide a name or connect to a container first."
)
return docker.get_logs(container_name, tail=tail)
# ──────────────────────────────────────────
# Coverage Collection
# ──────────────────────────────────────────
def _get_coverage_dir(self, name: str) -> Path:
"""Get coverage directory for a container, ensure it exists."""
coverage_dir = Path(HOST_COVERAGE_BASE) / name
if not coverage_dir.exists():
raise FileNotFoundError(
f"No coverage data for container '{name}'. "
f"Was it launched with enable_coverage=True?"
)
return coverage_dir
def _parse_coverage_summary(self, output: str) -> dict[str, int | float | None]:
"""Parse coverage report output for metrics.
Example output:
Name Stmts Miss Branch BrPart Cover
---------------------------------------------------------
gnuradio/__init__.py 10 2 4 1 75%
...
TOTAL 100 25 40 10 70%
"""
result: dict[str, int | float | None] = {
"lines_covered": None,
"lines_total": None,
"coverage_percent": None,
}
# Look for TOTAL line
match = re.search(
r"^TOTAL\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+)%",
output,
re.MULTILINE,
)
if match:
total_stmts = int(match.group(1))
miss_stmts = int(match.group(2))
result["lines_total"] = total_stmts
result["lines_covered"] = total_stmts - miss_stmts
result["coverage_percent"] = float(match.group(5))
return result
def collect_coverage(self, name: str) -> CoverageDataModel:
"""Collect coverage data from a stopped container.
Combines any parallel coverage files and returns a summary.
Container must have been stopped (not removed) for coverage
data to be available.
Args:
name: Container name
"""
coverage_dir = self._get_coverage_dir(name)
# First, combine any parallel files (idempotent if already combined)
# This handles both single-run and multi-run scenarios
subprocess.run(
["coverage", "combine"],
cwd=coverage_dir,
capture_output=True,
)
coverage_file = coverage_dir / ".coverage"
if not coverage_file.exists():
# Check for parallel files that weren't combined
parallel_files = list(coverage_dir.glob(".coverage.*"))
if parallel_files:
raise RuntimeError(
f"Coverage combine failed. Found {len(parallel_files)} "
f"parallel files but no combined .coverage file."
)
raise FileNotFoundError(
f"No coverage data found in {coverage_dir}. "
f"Container may not have generated coverage data."
)
# Generate summary report
result = subprocess.run(
["coverage", "report", "--data-file", str(coverage_file)],
capture_output=True,
text=True,
)
summary = result.stdout if result.returncode == 0 else result.stderr
metrics = self._parse_coverage_summary(summary)
return CoverageDataModel(
container_name=name,
coverage_file=str(coverage_file),
summary=summary,
lines_covered=metrics["lines_covered"],
lines_total=metrics["lines_total"],
coverage_percent=metrics["coverage_percent"],
)
def generate_coverage_report(
self,
name: str,
format: Literal["html", "xml", "json"] = "html",
) -> CoverageReportModel:
"""Generate a coverage report in the specified format.
Args:
name: Container name
format: Report format (html, xml, json)
"""
coverage_dir = self._get_coverage_dir(name)
coverage_file = coverage_dir / ".coverage"
if not coverage_file.exists():
raise FileNotFoundError(
f"No combined coverage file for '{name}'. "
f"Call collect_coverage() first."
)
if format == "html":
report_path = coverage_dir / "htmlcov" / "index.html"
subprocess.run(
[
"coverage", "html",
"--data-file", str(coverage_file),
"-d", str(coverage_dir / "htmlcov"),
],
capture_output=True,
check=True,
)
elif format == "xml":
report_path = coverage_dir / "coverage.xml"
subprocess.run(
[
"coverage", "xml",
"--data-file", str(coverage_file),
"-o", str(report_path),
],
capture_output=True,
check=True,
)
elif format == "json":
report_path = coverage_dir / "coverage.json"
subprocess.run(
[
"coverage", "json",
"--data-file", str(coverage_file),
"-o", str(report_path),
],
capture_output=True,
check=True,
)
else:
raise ValueError(f"Unsupported format: {format}")
return CoverageReportModel(
container_name=name,
format=format,
report_path=str(report_path),
)
def combine_coverage(self, names: list[str]) -> CoverageDataModel:
"""Combine coverage data from multiple containers.
Useful for aggregating coverage across a test suite.
Args:
names: List of container names to combine
"""
if not names:
raise ValueError("At least one container name required")
combined_dir = Path(HOST_COVERAGE_BASE) / "combined"
combined_dir.mkdir(parents=True, exist_ok=True)
# Clear any existing combined data
for f in combined_dir.glob(".coverage*"):
f.unlink()
# Copy all coverage files to combined directory
for name in names:
coverage_dir = self._get_coverage_dir(name)
for cov_file in coverage_dir.glob(".coverage*"):
# Ensure unique names when copying
dest_name = f".coverage.{name}.{cov_file.name}"
shutil.copy(cov_file, combined_dir / dest_name)
# Run coverage combine
subprocess.run(
["coverage", "combine"],
cwd=combined_dir,
capture_output=True,
check=True,
)
# Generate summary
coverage_file = combined_dir / ".coverage"
result = subprocess.run(
["coverage", "report", "--data-file", str(coverage_file)],
capture_output=True,
text=True,
)
summary = result.stdout if result.returncode == 0 else result.stderr
metrics = self._parse_coverage_summary(summary)
return CoverageDataModel(
container_name="combined",
coverage_file=str(coverage_file),
summary=summary,
lines_covered=metrics["lines_covered"],
lines_total=metrics["lines_total"],
coverage_percent=metrics["coverage_percent"],
)
def delete_coverage(
self,
name: str | None = None,
older_than_days: int | None = None,
) -> int:
"""Delete coverage data.
Args:
name: Delete specific container's coverage
older_than_days: Delete all coverage older than N days
Returns:
Number of coverage directories deleted
"""
import time
deleted = 0
coverage_base = Path(HOST_COVERAGE_BASE)
if not coverage_base.exists():
return 0
if name is not None:
# Delete specific container's coverage
coverage_dir = coverage_base / name
if coverage_dir.exists():
shutil.rmtree(coverage_dir)
deleted += 1
elif older_than_days is not None:
# Delete coverage older than N days
cutoff = time.time() - (older_than_days * 86400)
for coverage_dir in coverage_base.iterdir():
if coverage_dir.is_dir():
if coverage_dir.stat().st_mtime < cutoff:
shutil.rmtree(coverage_dir)
deleted += 1
else:
# No filter - delete all
for coverage_dir in coverage_base.iterdir():
if coverage_dir.is_dir():
shutil.rmtree(coverage_dir)
deleted += 1
return deleted

View File

@ -180,7 +180,8 @@ class TestStopRemove:
mock_container = MagicMock()
mock_docker_client.containers.get.return_value = mock_container
assert docker_mw.stop("test") is True
mock_container.stop.assert_called_once_with(timeout=10)
# Default timeout is 30s for graceful shutdown (coverage needs time)
mock_container.stop.assert_called_once_with(timeout=30)
def test_remove(self, docker_mw, mock_docker_client):
mock_container = MagicMock()
@ -242,3 +243,142 @@ class TestGetXmlRpcPort:
mock_docker_client.containers.get.return_value = mock_container
assert docker_mw.get_xmlrpc_port("test") == DEFAULT_XMLRPC_PORT
class TestCoverage:
def test_launch_with_coverage_uses_coverage_image(
self, docker_mw, mock_docker_client, tmp_path
):
from gnuradio_mcp.middlewares.docker import COVERAGE_IMAGE, RUNTIME_IMAGE
fg_file = tmp_path / "test.grc"
fg_file.write_text("<flowgraph/>")
mock_container = MagicMock()
mock_container.id = "abc123def456"
mock_docker_client.containers.run.return_value = mock_container
# Without coverage
docker_mw.launch(str(fg_file), "test-no-cov", enable_coverage=False)
call_args = mock_docker_client.containers.run.call_args
assert call_args.args[0] == RUNTIME_IMAGE
mock_docker_client.reset_mock()
# With coverage
docker_mw.launch(str(fg_file), "test-with-cov", enable_coverage=True)
call_args = mock_docker_client.containers.run.call_args
assert call_args.args[0] == COVERAGE_IMAGE
def test_launch_with_coverage_sets_env_and_label(
self, docker_mw, mock_docker_client, tmp_path
):
fg_file = tmp_path / "test.grc"
fg_file.write_text("<flowgraph/>")
mock_container = MagicMock()
mock_container.id = "abc123def456"
mock_docker_client.containers.run.return_value = mock_container
result = docker_mw.launch(str(fg_file), "test-cov", enable_coverage=True)
call_kwargs = mock_docker_client.containers.run.call_args.kwargs
assert call_kwargs["environment"]["ENABLE_COVERAGE"] == "1"
assert call_kwargs["labels"]["gr-mcp.coverage-enabled"] == "1"
assert result.coverage_enabled is True
def test_launch_with_coverage_mounts_coverage_dir(
self, docker_mw, mock_docker_client, tmp_path
):
from gnuradio_mcp.middlewares.docker import (
CONTAINER_COVERAGE_DIR,
HOST_COVERAGE_BASE,
)
fg_file = tmp_path / "test.grc"
fg_file.write_text("<flowgraph/>")
mock_container = MagicMock()
mock_container.id = "abc123def456"
mock_docker_client.containers.run.return_value = mock_container
docker_mw.launch(str(fg_file), "test-cov-mount", enable_coverage=True)
call_kwargs = mock_docker_client.containers.run.call_args.kwargs
volumes = call_kwargs["volumes"]
# Coverage directory should be mounted
coverage_host_path = f"{HOST_COVERAGE_BASE}/test-cov-mount"
assert coverage_host_path in volumes
assert volumes[coverage_host_path]["bind"] == CONTAINER_COVERAGE_DIR
assert volumes[coverage_host_path]["mode"] == "rw"
def test_list_containers_includes_coverage_enabled(
self, docker_mw, mock_docker_client
):
mock_container_cov = MagicMock()
mock_container_cov.name = "with-cov"
mock_container_cov.id = "aaa111"
mock_container_cov.status = "running"
mock_container_cov.labels = {
"gr-mcp.flowgraph": "/test.grc",
"gr-mcp.xmlrpc-port": "8080",
"gr-mcp.vnc-enabled": "0",
"gr-mcp.coverage-enabled": "1",
}
mock_container_no_cov = MagicMock()
mock_container_no_cov.name = "no-cov"
mock_container_no_cov.id = "bbb222"
mock_container_no_cov.status = "running"
mock_container_no_cov.labels = {
"gr-mcp.flowgraph": "/test2.grc",
"gr-mcp.xmlrpc-port": "8081",
"gr-mcp.vnc-enabled": "0",
"gr-mcp.coverage-enabled": "0",
}
mock_docker_client.containers.list.return_value = [
mock_container_cov,
mock_container_no_cov,
]
result = docker_mw.list_containers()
assert len(result) == 2
assert result[0].coverage_enabled is True
assert result[1].coverage_enabled is False
def test_is_coverage_enabled(self, docker_mw, mock_docker_client):
mock_container = MagicMock()
mock_container.labels = {"gr-mcp.coverage-enabled": "1"}
mock_docker_client.containers.get.return_value = mock_container
assert docker_mw.is_coverage_enabled("test") is True
mock_container.labels = {"gr-mcp.coverage-enabled": "0"}
assert docker_mw.is_coverage_enabled("test") is False
mock_container.labels = {}
assert docker_mw.is_coverage_enabled("test") is False
def test_get_coverage_dir(self, docker_mw):
from pathlib import Path
from gnuradio_mcp.middlewares.docker import HOST_COVERAGE_BASE
result = docker_mw.get_coverage_dir("my-container")
expected = Path(HOST_COVERAGE_BASE) / "my-container"
assert result == expected
def test_stop_with_timeout_warning(self, docker_mw, mock_docker_client, caplog):
import logging
mock_container = MagicMock()
mock_container.stop.side_effect = Exception("Timeout waiting for container")
mock_docker_client.containers.get.return_value = mock_container
with caplog.at_level(logging.WARNING):
result = docker_mw.stop("test")
# Should still return True (container will be killed)
assert result is True
assert "didn't stop gracefully" in caplog.text

View File

@ -125,6 +125,7 @@ class TestContainerLifecycle:
name="my-fg",
xmlrpc_port=9090,
enable_vnc=True,
enable_coverage=False,
device_paths=None,
)
@ -324,3 +325,251 @@ class TestVisualFeedback:
def test_get_container_logs_requires_container(self, provider_with_docker):
with pytest.raises(RuntimeError, match="No container specified"):
provider_with_docker.get_container_logs()
class TestCoverageCollection:
"""Tests for coverage collection methods."""
def test_launch_with_coverage(self, provider_with_docker, mock_docker_mw, tmp_path):
fg = tmp_path / "test.grc"
fg.write_text("<flowgraph/>")
provider_with_docker.launch_flowgraph(
flowgraph_path=str(fg),
name="cov-test",
enable_coverage=True,
)
mock_docker_mw.launch.assert_called_once()
call_kwargs = mock_docker_mw.launch.call_args.kwargs
assert call_kwargs["enable_coverage"] is True
def test_collect_coverage_no_data(self, provider_with_docker):
with pytest.raises(FileNotFoundError, match="No coverage data"):
provider_with_docker.collect_coverage("nonexistent-container")
def test_collect_coverage_success(self, provider_with_docker, tmp_path, monkeypatch):
from gnuradio_mcp.models import CoverageDataModel
from gnuradio_mcp.middlewares.docker import HOST_COVERAGE_BASE
# Create fake coverage directory and file
monkeypatch.setattr(
"gnuradio_mcp.providers.runtime.HOST_COVERAGE_BASE", str(tmp_path)
)
coverage_dir = tmp_path / "test-container"
coverage_dir.mkdir()
(coverage_dir / ".coverage").write_bytes(b"fake coverage data")
# Mock subprocess to return fake coverage report
def mock_run(cmd, **kwargs):
class FakeResult:
stdout = """Name Stmts Miss Branch BrPart Cover
-----------------------------------------------
module.py 100 20 40 10 75%
-----------------------------------------------
TOTAL 100 20 40 10 75%"""
stderr = ""
returncode = 0
return FakeResult()
monkeypatch.setattr("subprocess.run", mock_run)
result = provider_with_docker.collect_coverage("test-container")
assert isinstance(result, CoverageDataModel)
assert result.container_name == "test-container"
assert result.coverage_percent == 75.0
assert result.lines_total == 100
assert result.lines_covered == 80 # 100 - 20 missed
def test_generate_coverage_report_html(self, provider_with_docker, tmp_path, monkeypatch):
from gnuradio_mcp.models import CoverageReportModel
# Setup
monkeypatch.setattr(
"gnuradio_mcp.providers.runtime.HOST_COVERAGE_BASE", str(tmp_path)
)
coverage_dir = tmp_path / "test-container"
coverage_dir.mkdir()
(coverage_dir / ".coverage").write_bytes(b"fake coverage data")
# Mock subprocess
def mock_run(cmd, **kwargs):
class FakeResult:
returncode = 0
# Create output file for HTML
if "html" in cmd:
html_dir = coverage_dir / "htmlcov"
html_dir.mkdir(exist_ok=True)
(html_dir / "index.html").write_text("<html>Coverage</html>")
return FakeResult()
monkeypatch.setattr("subprocess.run", mock_run)
result = provider_with_docker.generate_coverage_report("test-container", "html")
assert isinstance(result, CoverageReportModel)
assert result.format == "html"
assert "htmlcov" in result.report_path
def test_generate_coverage_report_xml(self, provider_with_docker, tmp_path, monkeypatch):
from gnuradio_mcp.models import CoverageReportModel
monkeypatch.setattr(
"gnuradio_mcp.providers.runtime.HOST_COVERAGE_BASE", str(tmp_path)
)
coverage_dir = tmp_path / "test-container"
coverage_dir.mkdir()
(coverage_dir / ".coverage").write_bytes(b"fake coverage data")
def mock_run(cmd, **kwargs):
class FakeResult:
returncode = 0
return FakeResult()
monkeypatch.setattr("subprocess.run", mock_run)
result = provider_with_docker.generate_coverage_report("test-container", "xml")
assert isinstance(result, CoverageReportModel)
assert result.format == "xml"
assert "coverage.xml" in result.report_path
def test_generate_coverage_report_requires_coverage_file(
self, provider_with_docker, tmp_path, monkeypatch
):
monkeypatch.setattr(
"gnuradio_mcp.providers.runtime.HOST_COVERAGE_BASE", str(tmp_path)
)
coverage_dir = tmp_path / "test-container"
coverage_dir.mkdir()
# No .coverage file
with pytest.raises(FileNotFoundError, match="No combined coverage file"):
provider_with_docker.generate_coverage_report("test-container", "html")
def test_combine_coverage(self, provider_with_docker, tmp_path, monkeypatch):
from gnuradio_mcp.models import CoverageDataModel
monkeypatch.setattr(
"gnuradio_mcp.providers.runtime.HOST_COVERAGE_BASE", str(tmp_path)
)
# Create two containers with coverage data
for name in ["container-1", "container-2"]:
coverage_dir = tmp_path / name
coverage_dir.mkdir()
(coverage_dir / ".coverage").write_bytes(b"fake coverage")
def mock_run(cmd, **kwargs):
class FakeResult:
stdout = "TOTAL 200 40 80 20 75%"
stderr = ""
returncode = 0
# Create combined coverage file
if "combine" in cmd:
combined_dir = tmp_path / "combined"
combined_dir.mkdir(exist_ok=True)
(combined_dir / ".coverage").write_bytes(b"combined data")
return FakeResult()
monkeypatch.setattr("subprocess.run", mock_run)
result = provider_with_docker.combine_coverage(["container-1", "container-2"])
assert isinstance(result, CoverageDataModel)
assert result.container_name == "combined"
def test_combine_coverage_requires_names(self, provider_with_docker):
with pytest.raises(ValueError, match="At least one container"):
provider_with_docker.combine_coverage([])
def test_delete_coverage_specific(self, provider_with_docker, tmp_path, monkeypatch):
monkeypatch.setattr(
"gnuradio_mcp.providers.runtime.HOST_COVERAGE_BASE", str(tmp_path)
)
# Create coverage directory
coverage_dir = tmp_path / "test-container"
coverage_dir.mkdir()
(coverage_dir / ".coverage").write_bytes(b"data")
deleted = provider_with_docker.delete_coverage(name="test-container")
assert deleted == 1
assert not coverage_dir.exists()
def test_delete_coverage_older_than(self, provider_with_docker, tmp_path, monkeypatch):
import os
import time
monkeypatch.setattr(
"gnuradio_mcp.providers.runtime.HOST_COVERAGE_BASE", str(tmp_path)
)
# Create old and new coverage directories
old_dir = tmp_path / "old-container"
old_dir.mkdir()
# Set mtime to 10 days ago
old_time = time.time() - (10 * 86400)
os.utime(old_dir, (old_time, old_time))
new_dir = tmp_path / "new-container"
new_dir.mkdir()
deleted = provider_with_docker.delete_coverage(older_than_days=7)
assert deleted == 1
assert not old_dir.exists()
assert new_dir.exists()
def test_delete_coverage_all(self, provider_with_docker, tmp_path, monkeypatch):
monkeypatch.setattr(
"gnuradio_mcp.providers.runtime.HOST_COVERAGE_BASE", str(tmp_path)
)
# Create multiple directories
(tmp_path / "container-1").mkdir()
(tmp_path / "container-2").mkdir()
deleted = provider_with_docker.delete_coverage()
assert deleted == 2
assert not (tmp_path / "container-1").exists()
assert not (tmp_path / "container-2").exists()
def test_delete_coverage_nonexistent(self, provider_with_docker, tmp_path, monkeypatch):
monkeypatch.setattr(
"gnuradio_mcp.providers.runtime.HOST_COVERAGE_BASE", str(tmp_path)
)
# tmp_path exists but is empty
deleted = provider_with_docker.delete_coverage(name="nonexistent")
assert deleted == 0
def test_parse_coverage_summary(self, provider_with_docker):
summary = """Name Stmts Miss Branch BrPart Cover
-----------------------------------------------
module.py 150 30 60 15 80%
other.py 50 20 20 5 60%
-----------------------------------------------
TOTAL 200 50 80 20 75%"""
metrics = provider_with_docker._parse_coverage_summary(summary)
assert metrics["lines_total"] == 200
assert metrics["lines_covered"] == 150 # 200 - 50 missed
assert metrics["coverage_percent"] == 75.0
def test_parse_coverage_summary_no_total(self, provider_with_docker):
summary = "No coverage data collected"
metrics = provider_with_docker._parse_coverage_summary(summary)
assert metrics["lines_total"] is None
assert metrics["lines_covered"] is None
assert metrics["coverage_percent"] is None