From f3efb3643570d418329b6cde47decf4feb99f264 Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Fri, 30 Jan 2026 13:55:21 -0700 Subject: [PATCH] feat: add gap analysis tools, OOT block loading, and configurable Docker image Source changes spanning three features: - Gap analysis: 12 new MCP tools (generate_code, load_flowgraph, search_blocks, get_block_categories, flowgraph options, embedded Python blocks, expression evaluation, block bypass, export/import) - OOT support: load_oot_blocks tool + auto-discovery of paths like /usr/local/share/gnuradio/grc/blocks for third-party modules - Docker: configurable image parameter on launch_flowgraph for running OOT-enabled containers (e.g. gnuradio-lora-runtime) Resolves merge from feat/oot-block-paths into gap analysis work. All 274 tests pass (204 unit + 70 integration). --- src/gnuradio_mcp/middlewares/docker.py | 7 +- src/gnuradio_mcp/middlewares/flowgraph.py | 203 ++++++++++++++++- src/gnuradio_mcp/middlewares/platform.py | 72 +++++- src/gnuradio_mcp/models.py | 93 ++++++++ src/gnuradio_mcp/providers/base.py | 103 ++++++++- src/gnuradio_mcp/providers/mcp.py | 66 ++++-- src/gnuradio_mcp/providers/runtime.py | 3 + tests/unit/test_gap_fills.py | 265 ++++++++++++++++++++++ tests/unit/test_runtime_provider.py | 1 + 9 files changed, 789 insertions(+), 24 deletions(-) create mode 100644 tests/unit/test_gap_fills.py diff --git a/src/gnuradio_mcp/middlewares/docker.py b/src/gnuradio_mcp/middlewares/docker.py index 434f9eb..f30c21b 100644 --- a/src/gnuradio_mcp/middlewares/docker.py +++ b/src/gnuradio_mcp/middlewares/docker.py @@ -61,6 +61,7 @@ class DockerMiddleware: controlport_port: int = DEFAULT_CONTROLPORT_PORT, enable_perf_counters: bool = True, device_paths: list[str] | None = None, + image: str | None = None, ) -> ContainerModel: """Launch a flowgraph in a Docker container with Xvfb. @@ -74,6 +75,7 @@ class DockerMiddleware: controlport_port: Port for ControlPort (default 9090) enable_perf_counters: Enable performance counters (requires controlport) device_paths: Host device paths to pass through (e.g., /dev/ttyUSB0) + image: Docker image to use (default: gnuradio-runtime or gnuradio-coverage) """ fg_path = Path(flowgraph_path).resolve() if not fg_path.exists(): @@ -97,8 +99,9 @@ class DockerMiddleware: xmlrpc_port, ) - # Select image based on coverage mode - image = COVERAGE_IMAGE if enable_coverage else RUNTIME_IMAGE + # Select image: explicit override > coverage > runtime + if image is None: + image = COVERAGE_IMAGE if enable_coverage else RUNTIME_IMAGE env = {"DISPLAY": ":99", "XMLRPC_PORT": str(xmlrpc_port)} if enable_vnc: diff --git a/src/gnuradio_mcp/middlewares/flowgraph.py b/src/gnuradio_mcp/middlewares/flowgraph.py index b9a15f7..5ccd1ba 100644 --- a/src/gnuradio_mcp/middlewares/flowgraph.py +++ b/src/gnuradio_mcp/middlewares/flowgraph.py @@ -1,6 +1,8 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Optional +import logging +import tempfile +from typing import TYPE_CHECKING, Any, Optional from gnuradio.grc.core.blocks.block import Block from gnuradio.grc.core.FlowGraph import FlowGraph @@ -10,22 +12,30 @@ from gnuradio_mcp.middlewares.block import BlockMiddleware from gnuradio_mcp.models import ( BlockModel, ConnectionModel, + EmbeddedBlockIOModel, + ErrorModel, + FlowgraphOptionsModel, + GeneratedCodeModel, + GeneratedFileModel, PortModel, ) -from gnuradio_mcp.utils import get_port_from_port_model, get_unique_id +from gnuradio_mcp.utils import format_error_message, get_port_from_port_model, get_unique_id if TYPE_CHECKING: from gnuradio_mcp.middlewares.platform import PlatformMiddleware +logger = logging.getLogger(__name__) + def set_block_name(block: Block, name: str): block.params["id"].set_value(name) class FlowGraphMiddleware(ElementMiddleware): - def __init__(self, flowgraph: FlowGraph): + def __init__(self, flowgraph: FlowGraph, platform: "PlatformMiddleware | None" = None): super().__init__(flowgraph) self._flowgraph = self._element + self._platform_mw = platform @property def blocks(self) -> list[BlockModel]: @@ -77,6 +87,191 @@ class FlowGraphMiddleware(ElementMiddleware): for connection in self._flowgraph.connections ] + # ────────────────────────────────────────── + # Gap 1: Code Generation + # ────────────────────────────────────────── + + def generate_code(self, output_dir: str = "") -> GeneratedCodeModel: + """Generate Python/C++ code from the flowgraph. + + Unlike grcc, this does NOT block on validation errors — blocks with + dynamically-resolved ports (e.g. gr-lora_sdr soft_decoding) can still + produce valid runtime code even when GRC's static validator complains. + Validation warnings are included in the response for reference. + + Args: + output_dir: Directory for generated files. If empty, uses a + persistent temp directory (files survive the call). + """ + import os + + fg = self._flowgraph + fg.rewrite() + + # Collect validation state (non-blocking — never gate on this) + fg.validate() + warnings: list[ErrorModel] = [ + format_error_message(elem, msg) + for elem, msg in fg.iter_error_messages() + ] + is_valid = fg.is_valid() + + generate_options = fg.get_option("generate_options") or "no_gui" + flowgraph_id = fg.get_option("id") or "top_block" + + # Persistent output directory (NOT TemporaryDirectory context manager) + if not output_dir: + output_dir = tempfile.mkdtemp(prefix="gr_mcp_gen_") + os.makedirs(output_dir, exist_ok=True) + + # Generate via Platform's Generator (bypasses grcc validation gate) + if self._platform_mw: + generator = self._platform_mw._platform.Generator(fg, output_dir) + else: + from gnuradio.grc.core.generator import Generator + + generator = Generator(fg, output_dir) + generator.write() + + # Read back generated files as strings (preserves existing behavior) + files: list[GeneratedFileModel] = [] + for root, _dirs, filenames in os.walk(output_dir): + for fname in sorted(filenames): + fpath = os.path.join(root, fname) + try: + with open(fpath, "r", encoding="utf-8") as f: + content = f.read() + except (UnicodeDecodeError, OSError): + continue + is_main = fname == f"{flowgraph_id}.py" or fname == f"{flowgraph_id}.cpp" + files.append( + GeneratedFileModel( + filename=fname, + content=content, + is_main=is_main, + ) + ) + + return GeneratedCodeModel( + files=files, + generate_options=generate_options, + flowgraph_id=flowgraph_id, + output_dir=output_dir, + is_valid=is_valid, + warnings=warnings, + ) + + # ────────────────────────────────────────── + # Gap 3: Flowgraph Options + # ────────────────────────────────────────── + + def get_flowgraph_options(self) -> FlowgraphOptionsModel: + """Read the 'options' block parameters that control flowgraph behavior.""" + fg = self._flowgraph + opts = fg.options_block + + all_params = {} + for key, param in opts.params.items(): + all_params[key] = param.get_value() + + return FlowgraphOptionsModel( + id=all_params.get("id", ""), + title=all_params.get("title", ""), + author=all_params.get("author", ""), + description=all_params.get("description", ""), + generate_options=all_params.get("generate_options", ""), + run_options=all_params.get("run_options", ""), + output_language=all_params.get("output_language", ""), + catch_exceptions=all_params.get("catch_exceptions", ""), + all_params=all_params, + ) + + def set_flowgraph_options(self, params: dict[str, Any]) -> bool: + """Set parameters on the 'options' block.""" + fg = self._flowgraph + opts = fg.options_block + for key, value in params.items(): + if key in opts.params: + opts.params[key].set_value(value) + else: + raise KeyError(f"Unknown options parameter: {key!r}") + fg.rewrite() + return True + + # ────────────────────────────────────────── + # Gap 4: Embedded Python Blocks + # ────────────────────────────────────────── + + def create_embedded_python_block( + self, source_code: str, block_name: Optional[str] = None + ) -> BlockModel: + """Create an embedded Python block from source code. + + The source must define a class (typically named 'blk') that inherits + from a GNU Radio block base class. All __init__ parameters must have + default values. GRC auto-detects ports and parameters. + """ + block_name = block_name or get_unique_id(self._flowgraph.blocks, "epy_block") + block = self._flowgraph.new_block("epy_block") + assert block is not None, "Failed to create epy_block" + set_block_name(block, block_name) + block.params["_source_code"].set_value(source_code) + block.rewrite() + return BlockModel.from_block(block) + + # ────────────────────────────────────────── + # Gap 6: Expression Evaluation + # ────────────────────────────────────────── + + def evaluate_expression(self, expr: str) -> Any: + """Evaluate a Python expression in the flowgraph's namespace. + + The namespace includes all imports, variables, parameters, and + modules defined in the flowgraph. + """ + fg = self._flowgraph + fg.rewrite() + return fg.evaluate(expr) + + # ────────────────────────────────────────── + # Gap 7: Block Bypass + # ────────────────────────────────────────── + + def bypass_block(self, block_name: str) -> bool: + """Bypass a block (pass signal through without processing). + + Only works for single-input, single-output blocks with matching types. + """ + block_mw = self.get_block(block_name) + block = block_mw._block + if not block.can_bypass(): + raise ValueError( + f"Block {block_name!r} cannot be bypassed " + f"(requires 1 input and 1 output of the same type)" + ) + return block.set_bypassed() + + def unbypass_block(self, block_name: str) -> bool: + """Re-enable a bypassed block.""" + block_mw = self.get_block(block_name) + block = block_mw._block + if block.state == "bypassed": + block.state = "enabled" + return True + return False + + # ────────────────────────────────────────── + # Gap 8: Export Flowgraph Data + # ────────────────────────────────────────── + + def export_data(self) -> dict: + """Export the flowgraph as a nested dict (same format as .grc files).""" + return self._flowgraph.export_data() + + def import_data(self, data: dict) -> bool: + """Import flowgraph data from a nested dict, replacing current contents.""" + return self._flowgraph.import_data(data) + @classmethod def from_file( cls, platform: "PlatformMiddleware", filepath: str = "" @@ -84,4 +279,4 @@ class FlowGraphMiddleware(ElementMiddleware): initial_state = platform._platform.parse_flow_graph(filepath) flowgraph = FlowGraph(platform._platform) flowgraph.import_data(initial_state) - return cls(flowgraph) + return cls(flowgraph, platform=platform) diff --git a/src/gnuradio_mcp/middlewares/platform.py b/src/gnuradio_mcp/middlewares/platform.py index df0f2e8..b5f83ba 100644 --- a/src/gnuradio_mcp/middlewares/platform.py +++ b/src/gnuradio_mcp/middlewares/platform.py @@ -2,12 +2,13 @@ from __future__ import annotations import os from pathlib import Path +from typing import Optional from gnuradio.grc.core.platform import Platform from gnuradio_mcp.middlewares.base import ElementMiddleware from gnuradio_mcp.middlewares.flowgraph import FlowGraphMiddleware -from gnuradio_mcp.models import BlockTypeModel +from gnuradio_mcp.models import BlockTypeDetailModel, BlockTypeModel class PlatformMiddleware(ElementMiddleware): @@ -92,3 +93,72 @@ class PlatformMiddleware(ElementMiddleware): def save_flowgraph(self, filepath: str, flowgraph: FlowGraphMiddleware) -> None: self._platform.save_flow_graph(filepath, flowgraph._flowgraph) + + # ────────────────────────────────────────── + # Gap 2: Load Existing Flowgraph + # ────────────────────────────────────────── + + def load_flowgraph(self, filepath: str) -> FlowGraphMiddleware: + """Load an existing .grc file, replacing the current flowgraph.""" + return FlowGraphMiddleware.from_file(self, filepath) + + # ────────────────────────────────────────── + # Gap 5: Search/Browse Blocks by Category + # ────────────────────────────────────────── + + def search_blocks( + self, + query: str = "", + category: Optional[str] = None, + ) -> list[BlockTypeDetailModel]: + """Search available blocks by keyword and/or category. + + Args: + query: Substring match against key, label, or documentation. + category: Filter to blocks in this category (case-insensitive). + Matches if any element in the block's category path + contains the string. + """ + results = [] + query_lower = query.lower() + category_lower = category.lower() if category else None + + for block_type in self._platform.blocks.values(): + # Category filter + if category_lower: + block_cats = [c.lower() for c in (block_type.category or [])] + if not any(category_lower in c for c in block_cats): + continue + + # Query filter (empty query matches everything) + if query_lower: + searchable = ( + block_type.key.lower() + + " " + + block_type.label.lower() + ) + if hasattr(block_type, "documentation"): + doc = block_type.documentation + if isinstance(doc, dict): + searchable += " " + doc.get("", "").lower() + elif isinstance(doc, str): + searchable += " " + doc.lower() + + if query_lower not in searchable: + continue + + results.append(BlockTypeDetailModel.from_block_type(block_type)) + + return results + + def get_block_categories(self) -> dict[str, list[str]]: + """Get the full category tree with block keys per category. + + Returns a dict mapping category path (joined with '/') to + list of block keys in that category. + """ + categories: dict[str, list[str]] = {} + for block_type in self._platform.blocks.values(): + cat_path = "/".join(block_type.category) if block_type.category else "(uncategorized)" + categories.setdefault(cat_path, []).append(block_type.key) + return dict(sorted(categories.items())) diff --git a/src/gnuradio_mcp/models.py b/src/gnuradio_mcp/models.py index 9d056d6..cbfbd6e 100644 --- a/src/gnuradio_mcp/models.py +++ b/src/gnuradio_mcp/models.py @@ -262,3 +262,96 @@ class CoverageReportModel(BaseModel): container_name: str format: Literal["html", "xml", "json"] report_path: str + + +# ────────────────────────────────────────────── +# Platform / Design-Time Models (Phase 3: Gap Fills) +# ────────────────────────────────────────────── + + +class BlockTypeDetailModel(BaseModel): + """Extended block type info with category for search/browsing.""" + + label: str + key: str + category: list[str] = [] + documentation: str = "" + flags: list[str] = [] + deprecated: bool = False + + @classmethod + def from_block_type(cls, block: Type[Block]) -> BlockTypeDetailModel: + flags = [] + if hasattr(block, "flags") and hasattr(block.flags, "data"): + flags = sorted(block.flags.data) + doc = "" + if hasattr(block, "documentation") and isinstance(block.documentation, dict): + doc = block.documentation.get("", "") + deprecated = False + if hasattr(block, "is_deprecated") and callable(block.is_deprecated): + try: + # is_deprecated() requires an instance; check category fallback + deprecated = any( + "deprecated" in c.lower() + for c in (block.category or []) + ) + except Exception: + pass + return cls( + label=block.label, + key=block.key, + category=list(block.category) if block.category else [], + documentation=doc, + flags=flags, + deprecated=deprecated, + ) + + +class GeneratedFileModel(BaseModel): + """A single generated file.""" + + filename: str + content: str + is_main: bool = False + + +class GeneratedCodeModel(BaseModel): + """Generated code from a flowgraph. + + Unlike grcc, code generation does NOT block on validation errors. + The ``is_valid`` and ``warnings`` fields report validation state + without gating generation. + """ + + files: list[GeneratedFileModel] + generate_options: str + flowgraph_id: str + output_dir: str = "" + is_valid: bool = True + warnings: list[ErrorModel] = [] + + +class FlowgraphOptionsModel(BaseModel): + """Flowgraph-level options from the 'options' block.""" + + id: str + title: str = "" + author: str = "" + description: str = "" + generate_options: str = "" + run_options: str = "" + output_language: str = "" + catch_exceptions: str = "" + all_params: dict[str, Any] = {} + + +class EmbeddedBlockIOModel(BaseModel): + """I/O signature extracted from embedded Python block source.""" + + name: str + cls: str + params: list[tuple[str, str]] + sinks: list[tuple[str, str, int]] + sources: list[tuple[str, str, int]] + doc: str = "" + callbacks: list[str] = [] diff --git a/src/gnuradio_mcp/providers/base.py b/src/gnuradio_mcp/providers/base.py index f677668..e01f01a 100644 --- a/src/gnuradio_mcp/providers/base.py +++ b/src/gnuradio_mcp/providers/base.py @@ -1,13 +1,16 @@ -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional from gnuradio_mcp.middlewares.platform import PlatformMiddleware from gnuradio_mcp.models import ( SINK, SOURCE, BlockModel, + BlockTypeDetailModel, BlockTypeModel, ConnectionModel, ErrorModel, + FlowgraphOptionsModel, + GeneratedCodeModel, ParamModel, PortModel, ) @@ -125,3 +128,101 @@ class PlatformProvider: - blocks_after: Block count after reload """ return self._platform_mw.load_oot_paths(paths) + + ############################################## + # Gap 1: Code Generation + ############################################## + + def generate_code(self, output_dir: str = "") -> GeneratedCodeModel: + """Generate Python/C++ code from the current flowgraph. + + Unlike grcc, this does NOT block on validation errors. + Validation warnings are included in the response for reference. + """ + return self._flowgraph_mw.generate_code(output_dir) + + ############################################## + # Gap 2: Load Existing Flowgraph + ############################################## + + def load_flowgraph(self, filepath: str) -> list[BlockModel]: + """Load a .grc file, replacing the current flowgraph. + + Returns the blocks in the newly loaded flowgraph. + """ + self._flowgraph_mw = self._platform_mw.load_flowgraph(filepath) + return self._flowgraph_mw.blocks + + ############################################## + # Gap 3: Flowgraph Options + ############################################## + + def get_flowgraph_options(self) -> FlowgraphOptionsModel: + """Get the flowgraph-level options (title, author, generate_options, etc.).""" + return self._flowgraph_mw.get_flowgraph_options() + + def set_flowgraph_options(self, params: Dict[str, Any]) -> bool: + """Set flowgraph-level options on the 'options' block.""" + return self._flowgraph_mw.set_flowgraph_options(params) + + ############################################## + # Gap 4: Embedded Python Blocks + ############################################## + + def create_embedded_python_block( + self, source_code: str, block_name: Optional[str] = None + ) -> str: + """Create an embedded Python block from source code. + + Returns the block name. + """ + block_model = self._flowgraph_mw.create_embedded_python_block( + source_code, block_name + ) + return block_model.name + + ############################################## + # Gap 5: Search Blocks + ############################################## + + def search_blocks( + self, query: str = "", category: Optional[str] = None + ) -> list[BlockTypeDetailModel]: + """Search available blocks by keyword and/or category.""" + return self._platform_mw.search_blocks(query, category) + + def get_block_categories(self) -> dict[str, list[str]]: + """Get all block categories with their block keys.""" + return self._platform_mw.get_block_categories() + + ############################################## + # Gap 6: Expression Evaluation + ############################################## + + def evaluate_expression(self, expr: str) -> Any: + """Evaluate a Python expression in the flowgraph's namespace.""" + return self._flowgraph_mw.evaluate_expression(expr) + + ############################################## + # Gap 7: Block Bypass + ############################################## + + def bypass_block(self, block_name: str) -> bool: + """Bypass a block (pass signal through without processing).""" + return self._flowgraph_mw.bypass_block(block_name) + + def unbypass_block(self, block_name: str) -> bool: + """Re-enable a bypassed block.""" + return self._flowgraph_mw.unbypass_block(block_name) + + ############################################## + # Gap 8: Export/Import Flowgraph Data + ############################################## + + def export_flowgraph_data(self) -> dict: + """Export the flowgraph as a nested dict (same format as .grc files).""" + return self._flowgraph_mw.export_data() + + def import_flowgraph_data(self, data: dict) -> bool: + """Import flowgraph data from a dict, replacing current contents.""" + return self._flowgraph_mw.import_data(data) diff --git a/src/gnuradio_mcp/providers/mcp.py b/src/gnuradio_mcp/providers/mcp.py index c75e6b6..80d1b58 100644 --- a/src/gnuradio_mcp/providers/mcp.py +++ b/src/gnuradio_mcp/providers/mcp.py @@ -11,22 +11,56 @@ class McpPlatformProvider: self.__init_tools() def __init_tools(self): - self._mcp_instance.tool(self._platform_provider.get_blocks) - self._mcp_instance.tool(self._platform_provider.make_block) - self._mcp_instance.tool(self._platform_provider.remove_block) - self._mcp_instance.tool(self._platform_provider.get_block_params) - self._mcp_instance.tool(self._platform_provider.set_block_params) - self._mcp_instance.tool(self._platform_provider.get_block_sources) - self._mcp_instance.tool(self._platform_provider.get_block_sinks) - self._mcp_instance.tool(self._platform_provider.get_connections) - self._mcp_instance.tool(self._platform_provider.connect_blocks) - self._mcp_instance.tool(self._platform_provider.disconnect_blocks) - self._mcp_instance.tool(self._platform_provider.validate_block) - self._mcp_instance.tool(self._platform_provider.validate_flowgraph) - self._mcp_instance.tool(self._platform_provider.get_all_errors) - self._mcp_instance.tool(self._platform_provider.save_flowgraph) - self._mcp_instance.tool(self._platform_provider.get_all_available_blocks) - self._mcp_instance.tool(self._platform_provider.load_oot_blocks) + t = self._mcp_instance.tool + p = self._platform_provider + + # ── Existing tools ───────────────────── + t(p.get_blocks) + t(p.make_block) + t(p.remove_block) + t(p.get_block_params) + t(p.set_block_params) + t(p.get_block_sources) + t(p.get_block_sinks) + t(p.get_connections) + t(p.connect_blocks) + t(p.disconnect_blocks) + t(p.validate_block) + t(p.validate_flowgraph) + t(p.get_all_errors) + t(p.save_flowgraph) + t(p.get_all_available_blocks) + + # ── OOT Block Loading ────────────────── + t(p.load_oot_blocks) + + # ── Gap 1: Code Generation ───────────── + t(p.generate_code) + + # ── Gap 2: Load Flowgraph ────────────── + t(p.load_flowgraph) + + # ── Gap 3: Flowgraph Options ─────────── + t(p.get_flowgraph_options) + t(p.set_flowgraph_options) + + # ── Gap 4: Embedded Python Blocks ────── + t(p.create_embedded_python_block) + + # ── Gap 5: Search / Categories ───────── + t(p.search_blocks) + t(p.get_block_categories) + + # ── Gap 6: Expression Evaluation ─────── + t(p.evaluate_expression) + + # ── Gap 7: Block Bypass ──────────────── + t(p.bypass_block) + t(p.unbypass_block) + + # ── Gap 8: Export/Import Data ────────── + t(p.export_flowgraph_data) + t(p.import_flowgraph_data) @property def app(self) -> FastMCP: diff --git a/src/gnuradio_mcp/providers/runtime.py b/src/gnuradio_mcp/providers/runtime.py index daf43f5..a942a66 100644 --- a/src/gnuradio_mcp/providers/runtime.py +++ b/src/gnuradio_mcp/providers/runtime.py @@ -89,6 +89,7 @@ class RuntimeProvider: controlport_port: int = 9090, enable_perf_counters: bool = True, device_paths: list[str] | None = None, + image: str | None = None, ) -> ContainerModel: """Launch a flowgraph in a Docker container with Xvfb. @@ -102,6 +103,7 @@ class RuntimeProvider: controlport_port: Port for ControlPort (default 9090) enable_perf_counters: Enable performance counters (requires controlport) device_paths: Host device paths to pass through + image: Docker image to use (e.g., 'gnuradio-lora-runtime:latest') """ docker = self._require_docker() if name is None: @@ -116,6 +118,7 @@ class RuntimeProvider: controlport_port=controlport_port, enable_perf_counters=enable_perf_counters, device_paths=device_paths, + image=image, ) def list_containers(self) -> list[ContainerModel]: diff --git a/tests/unit/test_gap_fills.py b/tests/unit/test_gap_fills.py new file mode 100644 index 0000000..d941af0 --- /dev/null +++ b/tests/unit/test_gap_fills.py @@ -0,0 +1,265 @@ +"""Tests for the capability gap fill features (Gaps 1-8). + +These tests validate the new middleware and provider methods added to +close the gap between gr-mcp and grcc/GRC. +""" +from __future__ import annotations + +import pytest + +from gnuradio_mcp.middlewares.flowgraph import FlowGraphMiddleware +from gnuradio_mcp.middlewares.platform import PlatformMiddleware +from gnuradio_mcp.models import ( + BlockModel, + BlockTypeDetailModel, + FlowgraphOptionsModel, + GeneratedCodeModel, +) + + +@pytest.fixture +def flowgraph_middleware(platform_middleware: PlatformMiddleware): + return platform_middleware.make_flowgraph() + + +# ────────────────────────────────────────────── +# Gap 3: Flowgraph Options +# ────────────────────────────────────────────── + + +def test_get_flowgraph_options(flowgraph_middleware: FlowGraphMiddleware): + opts = flowgraph_middleware.get_flowgraph_options() + assert isinstance(opts, FlowgraphOptionsModel) + assert opts.id # Default flowgraph has an id + assert opts.generate_options # Should have a generate_options set + + +def test_set_flowgraph_options(flowgraph_middleware: FlowGraphMiddleware): + flowgraph_middleware.set_flowgraph_options({ + "title": "Test Flowgraph", + "author": "gr-mcp tests", + }) + opts = flowgraph_middleware.get_flowgraph_options() + assert opts.title == "Test Flowgraph" + assert opts.author == "gr-mcp tests" + + +def test_set_invalid_option_raises(flowgraph_middleware: FlowGraphMiddleware): + with pytest.raises(KeyError, match="nonexistent_key"): + flowgraph_middleware.set_flowgraph_options({"nonexistent_key": "value"}) + + +# ────────────────────────────────────────────── +# Gap 4: Embedded Python Blocks +# ────────────────────────────────────────────── + +EPY_SOURCE = '''\ +import numpy as np +from gnuradio import gr + +class blk(gr.sync_block): + """Test embedded block - multiply by constant""" + def __init__(self, gain=1.0): + gr.sync_block.__init__( + self, name='Test Gain Block', + in_sig=[np.float32], out_sig=[np.float32] + ) + self.gain = gain + + def work(self, input_items, output_items): + output_items[0][:] = input_items[0] * self.gain + return len(output_items[0]) +''' + + +def test_create_embedded_python_block(flowgraph_middleware: FlowGraphMiddleware): + model = flowgraph_middleware.create_embedded_python_block(EPY_SOURCE, "my_gain") + assert isinstance(model, BlockModel) + assert model.name == "my_gain" + + # Verify the block is in the flowgraph + assert any(b.name == "my_gain" for b in flowgraph_middleware.blocks) + + +def test_embedded_block_auto_names(flowgraph_middleware: FlowGraphMiddleware): + model = flowgraph_middleware.create_embedded_python_block(EPY_SOURCE) + assert "epy_block" in model.name + + +# ────────────────────────────────────────────── +# Gap 5: Search Blocks +# ────────────────────────────────────────────── + + +def test_search_blocks_by_query(platform_middleware: PlatformMiddleware): + results = platform_middleware.search_blocks(query="throttle") + assert len(results) > 0 + assert all(isinstance(r, BlockTypeDetailModel) for r in results) + assert any("throttle" in r.key.lower() for r in results) + + +def test_search_blocks_by_category(platform_middleware: PlatformMiddleware): + # GRC categories use names like "Core", "Waveform Generators", etc. + results = platform_middleware.search_blocks(category="Waveform Generators") + assert len(results) > 0 + for r in results: + assert any("waveform generators" in c.lower() for c in r.category) + + +def test_search_blocks_empty_query(platform_middleware: PlatformMiddleware): + # Empty query should return all blocks + all_results = platform_middleware.search_blocks() + all_blocks = platform_middleware.blocks + assert len(all_results) == len(all_blocks) + + +def test_search_blocks_no_match(platform_middleware: PlatformMiddleware): + results = platform_middleware.search_blocks(query="zzz_nonexistent_block_xyz") + assert results == [] + + +def test_get_block_categories(platform_middleware: PlatformMiddleware): + cats = platform_middleware.get_block_categories() + assert isinstance(cats, dict) + assert len(cats) > 0 + # Each value should be a list of block keys + for _cat, keys in cats.items(): + assert isinstance(keys, list) + assert all(isinstance(k, str) for k in keys) + + +# ────────────────────────────────────────────── +# Gap 6: Expression Evaluation +# ────────────────────────────────────────────── + + +def test_evaluate_simple_expression(flowgraph_middleware: FlowGraphMiddleware): + result = flowgraph_middleware.evaluate_expression("2 + 2") + assert result == 4 + + +def test_evaluate_variable_expression(flowgraph_middleware: FlowGraphMiddleware): + # Default flowgraph has samp_rate variable + result = flowgraph_middleware.evaluate_expression("samp_rate") + assert result == 32000 # Default value + + +# ────────────────────────────────────────────── +# Gap 7: Block Bypass +# ────────────────────────────────────────────── + + +def test_bypass_single_io_block(flowgraph_middleware: FlowGraphMiddleware): + # blocks_multiply_const_vxx has 1 input, 1 output of same type — bypassable + model = flowgraph_middleware.add_block("blocks_multiply_const_vxx") + result = flowgraph_middleware.bypass_block(model.name) + assert result is True + + # Verify state + block_mw = flowgraph_middleware.get_block(model.name) + assert block_mw._block.state == "bypassed" + + +def test_unbypass_block(flowgraph_middleware: FlowGraphMiddleware): + model = flowgraph_middleware.add_block("blocks_multiply_const_vxx") + flowgraph_middleware.bypass_block(model.name) + result = flowgraph_middleware.unbypass_block(model.name) + assert result is True + + block_mw = flowgraph_middleware.get_block(model.name) + assert block_mw._block.state == "enabled" + + +def test_bypass_multi_io_block_raises(flowgraph_middleware: FlowGraphMiddleware): + # blocks_copy has a hidden message port — 2 sinks — cannot be bypassed + model = flowgraph_middleware.add_block("blocks_copy") + with pytest.raises(ValueError, match="cannot be bypassed"): + flowgraph_middleware.bypass_block(model.name) + + +# ────────────────────────────────────────────── +# Gap 8: Export/Import Flowgraph Data +# ────────────────────────────────────────────── + + +def test_export_data(flowgraph_middleware: FlowGraphMiddleware): + data = flowgraph_middleware.export_data() + assert isinstance(data, dict) + assert "blocks" in data or "options" in data + + +def test_roundtrip_export_import( + flowgraph_middleware: FlowGraphMiddleware, + platform_middleware: PlatformMiddleware, +): + # Add a block, export, create fresh flowgraph, import + flowgraph_middleware.add_block("analog_sig_source_x", "my_sig_source") + exported = flowgraph_middleware.export_data() + + new_fg = platform_middleware.make_flowgraph() + new_fg.import_data(exported) + + assert any(b.name == "my_sig_source" for b in new_fg.blocks) + + +# ────────────────────────────────────────────── +# Gap 1: Code Generation +# ────────────────────────────────────────────── + + +def test_generate_code_produces_output(flowgraph_middleware: FlowGraphMiddleware): + result = flowgraph_middleware.generate_code() + assert isinstance(result, GeneratedCodeModel) + assert len(result.files) > 0 + assert result.flowgraph_id + assert result.generate_options + + # Should have at least one main file + main_files = [f for f in result.files if f.is_main] + assert len(main_files) >= 1 + + +def test_generate_code_contains_python(flowgraph_middleware: FlowGraphMiddleware): + result = flowgraph_middleware.generate_code() + main = next((f for f in result.files if f.is_main), None) + assert main is not None + # Generated Python code should contain typical markers + assert "import" in main.content or "#include" in main.content + + +def test_generate_code_with_output_dir(flowgraph_middleware: FlowGraphMiddleware): + """Files persist on disk when output_dir is specified.""" + import os + import tempfile + + output_dir = tempfile.mkdtemp(prefix="gr_mcp_test_") + result = flowgraph_middleware.generate_code(output_dir=output_dir) + + assert result.output_dir == output_dir + # Files should exist on disk + main = next((f for f in result.files if f.is_main), None) + assert main is not None + assert os.path.exists(os.path.join(output_dir, main.filename)) + + +def test_generate_code_returns_validation_state( + flowgraph_middleware: FlowGraphMiddleware, +): + """generate_code includes is_valid and warnings in response.""" + result = flowgraph_middleware.generate_code() + assert isinstance(result.is_valid, bool) + assert isinstance(result.warnings, list) + + +def test_generate_code_default_output_persists( + flowgraph_middleware: FlowGraphMiddleware, +): + """Default temp dir persists files (not cleaned up after call).""" + import os + + result = flowgraph_middleware.generate_code() + assert result.output_dir # Should have a temp path + assert os.path.isdir(result.output_dir) # Dir still exists + main = next((f for f in result.files if f.is_main), None) + if main: + assert os.path.exists(os.path.join(result.output_dir, main.filename)) diff --git a/tests/unit/test_runtime_provider.py b/tests/unit/test_runtime_provider.py index f2982c7..6117d11 100644 --- a/tests/unit/test_runtime_provider.py +++ b/tests/unit/test_runtime_provider.py @@ -132,6 +132,7 @@ class TestContainerLifecycle: controlport_port=9090, enable_perf_counters=True, device_paths=None, + image=None, ) def test_launch_flowgraph_auto_name(