feat: add gap analysis tools, OOT block loading, and configurable Docker image

Source changes spanning three features:

- Gap analysis: 12 new MCP tools (generate_code, load_flowgraph,
  search_blocks, get_block_categories, flowgraph options, embedded
  Python blocks, expression evaluation, block bypass, export/import)
- OOT support: load_oot_blocks tool + auto-discovery of paths like
  /usr/local/share/gnuradio/grc/blocks for third-party modules
- Docker: configurable image parameter on launch_flowgraph for
  running OOT-enabled containers (e.g. gnuradio-lora-runtime)

Resolves merge from feat/oot-block-paths into gap analysis work.
All 274 tests pass (204 unit + 70 integration).
This commit is contained in:
Ryan Malloy 2026-01-30 13:55:21 -07:00
parent 822dfffcb8
commit f3efb36435
9 changed files with 789 additions and 24 deletions

View File

@ -61,6 +61,7 @@ class DockerMiddleware:
controlport_port: int = DEFAULT_CONTROLPORT_PORT,
enable_perf_counters: bool = True,
device_paths: list[str] | None = None,
image: str | None = None,
) -> ContainerModel:
"""Launch a flowgraph in a Docker container with Xvfb.
@ -74,6 +75,7 @@ class DockerMiddleware:
controlport_port: Port for ControlPort (default 9090)
enable_perf_counters: Enable performance counters (requires controlport)
device_paths: Host device paths to pass through (e.g., /dev/ttyUSB0)
image: Docker image to use (default: gnuradio-runtime or gnuradio-coverage)
"""
fg_path = Path(flowgraph_path).resolve()
if not fg_path.exists():
@ -97,8 +99,9 @@ class DockerMiddleware:
xmlrpc_port,
)
# Select image based on coverage mode
image = COVERAGE_IMAGE if enable_coverage else RUNTIME_IMAGE
# Select image: explicit override > coverage > runtime
if image is None:
image = COVERAGE_IMAGE if enable_coverage else RUNTIME_IMAGE
env = {"DISPLAY": ":99", "XMLRPC_PORT": str(xmlrpc_port)}
if enable_vnc:

View File

@ -1,6 +1,8 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Optional
import logging
import tempfile
from typing import TYPE_CHECKING, Any, Optional
from gnuradio.grc.core.blocks.block import Block
from gnuradio.grc.core.FlowGraph import FlowGraph
@ -10,22 +12,30 @@ from gnuradio_mcp.middlewares.block import BlockMiddleware
from gnuradio_mcp.models import (
BlockModel,
ConnectionModel,
EmbeddedBlockIOModel,
ErrorModel,
FlowgraphOptionsModel,
GeneratedCodeModel,
GeneratedFileModel,
PortModel,
)
from gnuradio_mcp.utils import get_port_from_port_model, get_unique_id
from gnuradio_mcp.utils import format_error_message, get_port_from_port_model, get_unique_id
if TYPE_CHECKING:
from gnuradio_mcp.middlewares.platform import PlatformMiddleware
logger = logging.getLogger(__name__)
def set_block_name(block: Block, name: str):
block.params["id"].set_value(name)
class FlowGraphMiddleware(ElementMiddleware):
def __init__(self, flowgraph: FlowGraph):
def __init__(self, flowgraph: FlowGraph, platform: "PlatformMiddleware | None" = None):
super().__init__(flowgraph)
self._flowgraph = self._element
self._platform_mw = platform
@property
def blocks(self) -> list[BlockModel]:
@ -77,6 +87,191 @@ class FlowGraphMiddleware(ElementMiddleware):
for connection in self._flowgraph.connections
]
# ──────────────────────────────────────────
# Gap 1: Code Generation
# ──────────────────────────────────────────
def generate_code(self, output_dir: str = "") -> GeneratedCodeModel:
"""Generate Python/C++ code from the flowgraph.
Unlike grcc, this does NOT block on validation errors blocks with
dynamically-resolved ports (e.g. gr-lora_sdr soft_decoding) can still
produce valid runtime code even when GRC's static validator complains.
Validation warnings are included in the response for reference.
Args:
output_dir: Directory for generated files. If empty, uses a
persistent temp directory (files survive the call).
"""
import os
fg = self._flowgraph
fg.rewrite()
# Collect validation state (non-blocking — never gate on this)
fg.validate()
warnings: list[ErrorModel] = [
format_error_message(elem, msg)
for elem, msg in fg.iter_error_messages()
]
is_valid = fg.is_valid()
generate_options = fg.get_option("generate_options") or "no_gui"
flowgraph_id = fg.get_option("id") or "top_block"
# Persistent output directory (NOT TemporaryDirectory context manager)
if not output_dir:
output_dir = tempfile.mkdtemp(prefix="gr_mcp_gen_")
os.makedirs(output_dir, exist_ok=True)
# Generate via Platform's Generator (bypasses grcc validation gate)
if self._platform_mw:
generator = self._platform_mw._platform.Generator(fg, output_dir)
else:
from gnuradio.grc.core.generator import Generator
generator = Generator(fg, output_dir)
generator.write()
# Read back generated files as strings (preserves existing behavior)
files: list[GeneratedFileModel] = []
for root, _dirs, filenames in os.walk(output_dir):
for fname in sorted(filenames):
fpath = os.path.join(root, fname)
try:
with open(fpath, "r", encoding="utf-8") as f:
content = f.read()
except (UnicodeDecodeError, OSError):
continue
is_main = fname == f"{flowgraph_id}.py" or fname == f"{flowgraph_id}.cpp"
files.append(
GeneratedFileModel(
filename=fname,
content=content,
is_main=is_main,
)
)
return GeneratedCodeModel(
files=files,
generate_options=generate_options,
flowgraph_id=flowgraph_id,
output_dir=output_dir,
is_valid=is_valid,
warnings=warnings,
)
# ──────────────────────────────────────────
# Gap 3: Flowgraph Options
# ──────────────────────────────────────────
def get_flowgraph_options(self) -> FlowgraphOptionsModel:
"""Read the 'options' block parameters that control flowgraph behavior."""
fg = self._flowgraph
opts = fg.options_block
all_params = {}
for key, param in opts.params.items():
all_params[key] = param.get_value()
return FlowgraphOptionsModel(
id=all_params.get("id", ""),
title=all_params.get("title", ""),
author=all_params.get("author", ""),
description=all_params.get("description", ""),
generate_options=all_params.get("generate_options", ""),
run_options=all_params.get("run_options", ""),
output_language=all_params.get("output_language", ""),
catch_exceptions=all_params.get("catch_exceptions", ""),
all_params=all_params,
)
def set_flowgraph_options(self, params: dict[str, Any]) -> bool:
"""Set parameters on the 'options' block."""
fg = self._flowgraph
opts = fg.options_block
for key, value in params.items():
if key in opts.params:
opts.params[key].set_value(value)
else:
raise KeyError(f"Unknown options parameter: {key!r}")
fg.rewrite()
return True
# ──────────────────────────────────────────
# Gap 4: Embedded Python Blocks
# ──────────────────────────────────────────
def create_embedded_python_block(
self, source_code: str, block_name: Optional[str] = None
) -> BlockModel:
"""Create an embedded Python block from source code.
The source must define a class (typically named 'blk') that inherits
from a GNU Radio block base class. All __init__ parameters must have
default values. GRC auto-detects ports and parameters.
"""
block_name = block_name or get_unique_id(self._flowgraph.blocks, "epy_block")
block = self._flowgraph.new_block("epy_block")
assert block is not None, "Failed to create epy_block"
set_block_name(block, block_name)
block.params["_source_code"].set_value(source_code)
block.rewrite()
return BlockModel.from_block(block)
# ──────────────────────────────────────────
# Gap 6: Expression Evaluation
# ──────────────────────────────────────────
def evaluate_expression(self, expr: str) -> Any:
"""Evaluate a Python expression in the flowgraph's namespace.
The namespace includes all imports, variables, parameters, and
modules defined in the flowgraph.
"""
fg = self._flowgraph
fg.rewrite()
return fg.evaluate(expr)
# ──────────────────────────────────────────
# Gap 7: Block Bypass
# ──────────────────────────────────────────
def bypass_block(self, block_name: str) -> bool:
"""Bypass a block (pass signal through without processing).
Only works for single-input, single-output blocks with matching types.
"""
block_mw = self.get_block(block_name)
block = block_mw._block
if not block.can_bypass():
raise ValueError(
f"Block {block_name!r} cannot be bypassed "
f"(requires 1 input and 1 output of the same type)"
)
return block.set_bypassed()
def unbypass_block(self, block_name: str) -> bool:
"""Re-enable a bypassed block."""
block_mw = self.get_block(block_name)
block = block_mw._block
if block.state == "bypassed":
block.state = "enabled"
return True
return False
# ──────────────────────────────────────────
# Gap 8: Export Flowgraph Data
# ──────────────────────────────────────────
def export_data(self) -> dict:
"""Export the flowgraph as a nested dict (same format as .grc files)."""
return self._flowgraph.export_data()
def import_data(self, data: dict) -> bool:
"""Import flowgraph data from a nested dict, replacing current contents."""
return self._flowgraph.import_data(data)
@classmethod
def from_file(
cls, platform: "PlatformMiddleware", filepath: str = ""
@ -84,4 +279,4 @@ class FlowGraphMiddleware(ElementMiddleware):
initial_state = platform._platform.parse_flow_graph(filepath)
flowgraph = FlowGraph(platform._platform)
flowgraph.import_data(initial_state)
return cls(flowgraph)
return cls(flowgraph, platform=platform)

View File

@ -2,12 +2,13 @@ from __future__ import annotations
import os
from pathlib import Path
from typing import Optional
from gnuradio.grc.core.platform import Platform
from gnuradio_mcp.middlewares.base import ElementMiddleware
from gnuradio_mcp.middlewares.flowgraph import FlowGraphMiddleware
from gnuradio_mcp.models import BlockTypeModel
from gnuradio_mcp.models import BlockTypeDetailModel, BlockTypeModel
class PlatformMiddleware(ElementMiddleware):
@ -92,3 +93,72 @@ class PlatformMiddleware(ElementMiddleware):
def save_flowgraph(self, filepath: str, flowgraph: FlowGraphMiddleware) -> None:
self._platform.save_flow_graph(filepath, flowgraph._flowgraph)
# ──────────────────────────────────────────
# Gap 2: Load Existing Flowgraph
# ──────────────────────────────────────────
def load_flowgraph(self, filepath: str) -> FlowGraphMiddleware:
"""Load an existing .grc file, replacing the current flowgraph."""
return FlowGraphMiddleware.from_file(self, filepath)
# ──────────────────────────────────────────
# Gap 5: Search/Browse Blocks by Category
# ──────────────────────────────────────────
def search_blocks(
self,
query: str = "",
category: Optional[str] = None,
) -> list[BlockTypeDetailModel]:
"""Search available blocks by keyword and/or category.
Args:
query: Substring match against key, label, or documentation.
category: Filter to blocks in this category (case-insensitive).
Matches if any element in the block's category path
contains the string.
"""
results = []
query_lower = query.lower()
category_lower = category.lower() if category else None
for block_type in self._platform.blocks.values():
# Category filter
if category_lower:
block_cats = [c.lower() for c in (block_type.category or [])]
if not any(category_lower in c for c in block_cats):
continue
# Query filter (empty query matches everything)
if query_lower:
searchable = (
block_type.key.lower()
+ " "
+ block_type.label.lower()
)
if hasattr(block_type, "documentation"):
doc = block_type.documentation
if isinstance(doc, dict):
searchable += " " + doc.get("", "").lower()
elif isinstance(doc, str):
searchable += " " + doc.lower()
if query_lower not in searchable:
continue
results.append(BlockTypeDetailModel.from_block_type(block_type))
return results
def get_block_categories(self) -> dict[str, list[str]]:
"""Get the full category tree with block keys per category.
Returns a dict mapping category path (joined with '/') to
list of block keys in that category.
"""
categories: dict[str, list[str]] = {}
for block_type in self._platform.blocks.values():
cat_path = "/".join(block_type.category) if block_type.category else "(uncategorized)"
categories.setdefault(cat_path, []).append(block_type.key)
return dict(sorted(categories.items()))

View File

@ -262,3 +262,96 @@ class CoverageReportModel(BaseModel):
container_name: str
format: Literal["html", "xml", "json"]
report_path: str
# ──────────────────────────────────────────────
# Platform / Design-Time Models (Phase 3: Gap Fills)
# ──────────────────────────────────────────────
class BlockTypeDetailModel(BaseModel):
"""Extended block type info with category for search/browsing."""
label: str
key: str
category: list[str] = []
documentation: str = ""
flags: list[str] = []
deprecated: bool = False
@classmethod
def from_block_type(cls, block: Type[Block]) -> BlockTypeDetailModel:
flags = []
if hasattr(block, "flags") and hasattr(block.flags, "data"):
flags = sorted(block.flags.data)
doc = ""
if hasattr(block, "documentation") and isinstance(block.documentation, dict):
doc = block.documentation.get("", "")
deprecated = False
if hasattr(block, "is_deprecated") and callable(block.is_deprecated):
try:
# is_deprecated() requires an instance; check category fallback
deprecated = any(
"deprecated" in c.lower()
for c in (block.category or [])
)
except Exception:
pass
return cls(
label=block.label,
key=block.key,
category=list(block.category) if block.category else [],
documentation=doc,
flags=flags,
deprecated=deprecated,
)
class GeneratedFileModel(BaseModel):
"""A single generated file."""
filename: str
content: str
is_main: bool = False
class GeneratedCodeModel(BaseModel):
"""Generated code from a flowgraph.
Unlike grcc, code generation does NOT block on validation errors.
The ``is_valid`` and ``warnings`` fields report validation state
without gating generation.
"""
files: list[GeneratedFileModel]
generate_options: str
flowgraph_id: str
output_dir: str = ""
is_valid: bool = True
warnings: list[ErrorModel] = []
class FlowgraphOptionsModel(BaseModel):
"""Flowgraph-level options from the 'options' block."""
id: str
title: str = ""
author: str = ""
description: str = ""
generate_options: str = ""
run_options: str = ""
output_language: str = ""
catch_exceptions: str = ""
all_params: dict[str, Any] = {}
class EmbeddedBlockIOModel(BaseModel):
"""I/O signature extracted from embedded Python block source."""
name: str
cls: str
params: list[tuple[str, str]]
sinks: list[tuple[str, str, int]]
sources: list[tuple[str, str, int]]
doc: str = ""
callbacks: list[str] = []

View File

@ -1,13 +1,16 @@
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional
from gnuradio_mcp.middlewares.platform import PlatformMiddleware
from gnuradio_mcp.models import (
SINK,
SOURCE,
BlockModel,
BlockTypeDetailModel,
BlockTypeModel,
ConnectionModel,
ErrorModel,
FlowgraphOptionsModel,
GeneratedCodeModel,
ParamModel,
PortModel,
)
@ -125,3 +128,101 @@ class PlatformProvider:
- blocks_after: Block count after reload
"""
return self._platform_mw.load_oot_paths(paths)
##############################################
# Gap 1: Code Generation
##############################################
def generate_code(self, output_dir: str = "") -> GeneratedCodeModel:
"""Generate Python/C++ code from the current flowgraph.
Unlike grcc, this does NOT block on validation errors.
Validation warnings are included in the response for reference.
"""
return self._flowgraph_mw.generate_code(output_dir)
##############################################
# Gap 2: Load Existing Flowgraph
##############################################
def load_flowgraph(self, filepath: str) -> list[BlockModel]:
"""Load a .grc file, replacing the current flowgraph.
Returns the blocks in the newly loaded flowgraph.
"""
self._flowgraph_mw = self._platform_mw.load_flowgraph(filepath)
return self._flowgraph_mw.blocks
##############################################
# Gap 3: Flowgraph Options
##############################################
def get_flowgraph_options(self) -> FlowgraphOptionsModel:
"""Get the flowgraph-level options (title, author, generate_options, etc.)."""
return self._flowgraph_mw.get_flowgraph_options()
def set_flowgraph_options(self, params: Dict[str, Any]) -> bool:
"""Set flowgraph-level options on the 'options' block."""
return self._flowgraph_mw.set_flowgraph_options(params)
##############################################
# Gap 4: Embedded Python Blocks
##############################################
def create_embedded_python_block(
self, source_code: str, block_name: Optional[str] = None
) -> str:
"""Create an embedded Python block from source code.
Returns the block name.
"""
block_model = self._flowgraph_mw.create_embedded_python_block(
source_code, block_name
)
return block_model.name
##############################################
# Gap 5: Search Blocks
##############################################
def search_blocks(
self, query: str = "", category: Optional[str] = None
) -> list[BlockTypeDetailModel]:
"""Search available blocks by keyword and/or category."""
return self._platform_mw.search_blocks(query, category)
def get_block_categories(self) -> dict[str, list[str]]:
"""Get all block categories with their block keys."""
return self._platform_mw.get_block_categories()
##############################################
# Gap 6: Expression Evaluation
##############################################
def evaluate_expression(self, expr: str) -> Any:
"""Evaluate a Python expression in the flowgraph's namespace."""
return self._flowgraph_mw.evaluate_expression(expr)
##############################################
# Gap 7: Block Bypass
##############################################
def bypass_block(self, block_name: str) -> bool:
"""Bypass a block (pass signal through without processing)."""
return self._flowgraph_mw.bypass_block(block_name)
def unbypass_block(self, block_name: str) -> bool:
"""Re-enable a bypassed block."""
return self._flowgraph_mw.unbypass_block(block_name)
##############################################
# Gap 8: Export/Import Flowgraph Data
##############################################
def export_flowgraph_data(self) -> dict:
"""Export the flowgraph as a nested dict (same format as .grc files)."""
return self._flowgraph_mw.export_data()
def import_flowgraph_data(self, data: dict) -> bool:
"""Import flowgraph data from a dict, replacing current contents."""
return self._flowgraph_mw.import_data(data)

View File

@ -11,22 +11,56 @@ class McpPlatformProvider:
self.__init_tools()
def __init_tools(self):
self._mcp_instance.tool(self._platform_provider.get_blocks)
self._mcp_instance.tool(self._platform_provider.make_block)
self._mcp_instance.tool(self._platform_provider.remove_block)
self._mcp_instance.tool(self._platform_provider.get_block_params)
self._mcp_instance.tool(self._platform_provider.set_block_params)
self._mcp_instance.tool(self._platform_provider.get_block_sources)
self._mcp_instance.tool(self._platform_provider.get_block_sinks)
self._mcp_instance.tool(self._platform_provider.get_connections)
self._mcp_instance.tool(self._platform_provider.connect_blocks)
self._mcp_instance.tool(self._platform_provider.disconnect_blocks)
self._mcp_instance.tool(self._platform_provider.validate_block)
self._mcp_instance.tool(self._platform_provider.validate_flowgraph)
self._mcp_instance.tool(self._platform_provider.get_all_errors)
self._mcp_instance.tool(self._platform_provider.save_flowgraph)
self._mcp_instance.tool(self._platform_provider.get_all_available_blocks)
self._mcp_instance.tool(self._platform_provider.load_oot_blocks)
t = self._mcp_instance.tool
p = self._platform_provider
# ── Existing tools ─────────────────────
t(p.get_blocks)
t(p.make_block)
t(p.remove_block)
t(p.get_block_params)
t(p.set_block_params)
t(p.get_block_sources)
t(p.get_block_sinks)
t(p.get_connections)
t(p.connect_blocks)
t(p.disconnect_blocks)
t(p.validate_block)
t(p.validate_flowgraph)
t(p.get_all_errors)
t(p.save_flowgraph)
t(p.get_all_available_blocks)
# ── OOT Block Loading ──────────────────
t(p.load_oot_blocks)
# ── Gap 1: Code Generation ─────────────
t(p.generate_code)
# ── Gap 2: Load Flowgraph ──────────────
t(p.load_flowgraph)
# ── Gap 3: Flowgraph Options ───────────
t(p.get_flowgraph_options)
t(p.set_flowgraph_options)
# ── Gap 4: Embedded Python Blocks ──────
t(p.create_embedded_python_block)
# ── Gap 5: Search / Categories ─────────
t(p.search_blocks)
t(p.get_block_categories)
# ── Gap 6: Expression Evaluation ───────
t(p.evaluate_expression)
# ── Gap 7: Block Bypass ────────────────
t(p.bypass_block)
t(p.unbypass_block)
# ── Gap 8: Export/Import Data ──────────
t(p.export_flowgraph_data)
t(p.import_flowgraph_data)
@property
def app(self) -> FastMCP:

View File

@ -89,6 +89,7 @@ class RuntimeProvider:
controlport_port: int = 9090,
enable_perf_counters: bool = True,
device_paths: list[str] | None = None,
image: str | None = None,
) -> ContainerModel:
"""Launch a flowgraph in a Docker container with Xvfb.
@ -102,6 +103,7 @@ class RuntimeProvider:
controlport_port: Port for ControlPort (default 9090)
enable_perf_counters: Enable performance counters (requires controlport)
device_paths: Host device paths to pass through
image: Docker image to use (e.g., 'gnuradio-lora-runtime:latest')
"""
docker = self._require_docker()
if name is None:
@ -116,6 +118,7 @@ class RuntimeProvider:
controlport_port=controlport_port,
enable_perf_counters=enable_perf_counters,
device_paths=device_paths,
image=image,
)
def list_containers(self) -> list[ContainerModel]:

View File

@ -0,0 +1,265 @@
"""Tests for the capability gap fill features (Gaps 1-8).
These tests validate the new middleware and provider methods added to
close the gap between gr-mcp and grcc/GRC.
"""
from __future__ import annotations
import pytest
from gnuradio_mcp.middlewares.flowgraph import FlowGraphMiddleware
from gnuradio_mcp.middlewares.platform import PlatformMiddleware
from gnuradio_mcp.models import (
BlockModel,
BlockTypeDetailModel,
FlowgraphOptionsModel,
GeneratedCodeModel,
)
@pytest.fixture
def flowgraph_middleware(platform_middleware: PlatformMiddleware):
return platform_middleware.make_flowgraph()
# ──────────────────────────────────────────────
# Gap 3: Flowgraph Options
# ──────────────────────────────────────────────
def test_get_flowgraph_options(flowgraph_middleware: FlowGraphMiddleware):
opts = flowgraph_middleware.get_flowgraph_options()
assert isinstance(opts, FlowgraphOptionsModel)
assert opts.id # Default flowgraph has an id
assert opts.generate_options # Should have a generate_options set
def test_set_flowgraph_options(flowgraph_middleware: FlowGraphMiddleware):
flowgraph_middleware.set_flowgraph_options({
"title": "Test Flowgraph",
"author": "gr-mcp tests",
})
opts = flowgraph_middleware.get_flowgraph_options()
assert opts.title == "Test Flowgraph"
assert opts.author == "gr-mcp tests"
def test_set_invalid_option_raises(flowgraph_middleware: FlowGraphMiddleware):
with pytest.raises(KeyError, match="nonexistent_key"):
flowgraph_middleware.set_flowgraph_options({"nonexistent_key": "value"})
# ──────────────────────────────────────────────
# Gap 4: Embedded Python Blocks
# ──────────────────────────────────────────────
EPY_SOURCE = '''\
import numpy as np
from gnuradio import gr
class blk(gr.sync_block):
"""Test embedded block - multiply by constant"""
def __init__(self, gain=1.0):
gr.sync_block.__init__(
self, name='Test Gain Block',
in_sig=[np.float32], out_sig=[np.float32]
)
self.gain = gain
def work(self, input_items, output_items):
output_items[0][:] = input_items[0] * self.gain
return len(output_items[0])
'''
def test_create_embedded_python_block(flowgraph_middleware: FlowGraphMiddleware):
model = flowgraph_middleware.create_embedded_python_block(EPY_SOURCE, "my_gain")
assert isinstance(model, BlockModel)
assert model.name == "my_gain"
# Verify the block is in the flowgraph
assert any(b.name == "my_gain" for b in flowgraph_middleware.blocks)
def test_embedded_block_auto_names(flowgraph_middleware: FlowGraphMiddleware):
model = flowgraph_middleware.create_embedded_python_block(EPY_SOURCE)
assert "epy_block" in model.name
# ──────────────────────────────────────────────
# Gap 5: Search Blocks
# ──────────────────────────────────────────────
def test_search_blocks_by_query(platform_middleware: PlatformMiddleware):
results = platform_middleware.search_blocks(query="throttle")
assert len(results) > 0
assert all(isinstance(r, BlockTypeDetailModel) for r in results)
assert any("throttle" in r.key.lower() for r in results)
def test_search_blocks_by_category(platform_middleware: PlatformMiddleware):
# GRC categories use names like "Core", "Waveform Generators", etc.
results = platform_middleware.search_blocks(category="Waveform Generators")
assert len(results) > 0
for r in results:
assert any("waveform generators" in c.lower() for c in r.category)
def test_search_blocks_empty_query(platform_middleware: PlatformMiddleware):
# Empty query should return all blocks
all_results = platform_middleware.search_blocks()
all_blocks = platform_middleware.blocks
assert len(all_results) == len(all_blocks)
def test_search_blocks_no_match(platform_middleware: PlatformMiddleware):
results = platform_middleware.search_blocks(query="zzz_nonexistent_block_xyz")
assert results == []
def test_get_block_categories(platform_middleware: PlatformMiddleware):
cats = platform_middleware.get_block_categories()
assert isinstance(cats, dict)
assert len(cats) > 0
# Each value should be a list of block keys
for _cat, keys in cats.items():
assert isinstance(keys, list)
assert all(isinstance(k, str) for k in keys)
# ──────────────────────────────────────────────
# Gap 6: Expression Evaluation
# ──────────────────────────────────────────────
def test_evaluate_simple_expression(flowgraph_middleware: FlowGraphMiddleware):
result = flowgraph_middleware.evaluate_expression("2 + 2")
assert result == 4
def test_evaluate_variable_expression(flowgraph_middleware: FlowGraphMiddleware):
# Default flowgraph has samp_rate variable
result = flowgraph_middleware.evaluate_expression("samp_rate")
assert result == 32000 # Default value
# ──────────────────────────────────────────────
# Gap 7: Block Bypass
# ──────────────────────────────────────────────
def test_bypass_single_io_block(flowgraph_middleware: FlowGraphMiddleware):
# blocks_multiply_const_vxx has 1 input, 1 output of same type — bypassable
model = flowgraph_middleware.add_block("blocks_multiply_const_vxx")
result = flowgraph_middleware.bypass_block(model.name)
assert result is True
# Verify state
block_mw = flowgraph_middleware.get_block(model.name)
assert block_mw._block.state == "bypassed"
def test_unbypass_block(flowgraph_middleware: FlowGraphMiddleware):
model = flowgraph_middleware.add_block("blocks_multiply_const_vxx")
flowgraph_middleware.bypass_block(model.name)
result = flowgraph_middleware.unbypass_block(model.name)
assert result is True
block_mw = flowgraph_middleware.get_block(model.name)
assert block_mw._block.state == "enabled"
def test_bypass_multi_io_block_raises(flowgraph_middleware: FlowGraphMiddleware):
# blocks_copy has a hidden message port — 2 sinks — cannot be bypassed
model = flowgraph_middleware.add_block("blocks_copy")
with pytest.raises(ValueError, match="cannot be bypassed"):
flowgraph_middleware.bypass_block(model.name)
# ──────────────────────────────────────────────
# Gap 8: Export/Import Flowgraph Data
# ──────────────────────────────────────────────
def test_export_data(flowgraph_middleware: FlowGraphMiddleware):
data = flowgraph_middleware.export_data()
assert isinstance(data, dict)
assert "blocks" in data or "options" in data
def test_roundtrip_export_import(
flowgraph_middleware: FlowGraphMiddleware,
platform_middleware: PlatformMiddleware,
):
# Add a block, export, create fresh flowgraph, import
flowgraph_middleware.add_block("analog_sig_source_x", "my_sig_source")
exported = flowgraph_middleware.export_data()
new_fg = platform_middleware.make_flowgraph()
new_fg.import_data(exported)
assert any(b.name == "my_sig_source" for b in new_fg.blocks)
# ──────────────────────────────────────────────
# Gap 1: Code Generation
# ──────────────────────────────────────────────
def test_generate_code_produces_output(flowgraph_middleware: FlowGraphMiddleware):
result = flowgraph_middleware.generate_code()
assert isinstance(result, GeneratedCodeModel)
assert len(result.files) > 0
assert result.flowgraph_id
assert result.generate_options
# Should have at least one main file
main_files = [f for f in result.files if f.is_main]
assert len(main_files) >= 1
def test_generate_code_contains_python(flowgraph_middleware: FlowGraphMiddleware):
result = flowgraph_middleware.generate_code()
main = next((f for f in result.files if f.is_main), None)
assert main is not None
# Generated Python code should contain typical markers
assert "import" in main.content or "#include" in main.content
def test_generate_code_with_output_dir(flowgraph_middleware: FlowGraphMiddleware):
"""Files persist on disk when output_dir is specified."""
import os
import tempfile
output_dir = tempfile.mkdtemp(prefix="gr_mcp_test_")
result = flowgraph_middleware.generate_code(output_dir=output_dir)
assert result.output_dir == output_dir
# Files should exist on disk
main = next((f for f in result.files if f.is_main), None)
assert main is not None
assert os.path.exists(os.path.join(output_dir, main.filename))
def test_generate_code_returns_validation_state(
flowgraph_middleware: FlowGraphMiddleware,
):
"""generate_code includes is_valid and warnings in response."""
result = flowgraph_middleware.generate_code()
assert isinstance(result.is_valid, bool)
assert isinstance(result.warnings, list)
def test_generate_code_default_output_persists(
flowgraph_middleware: FlowGraphMiddleware,
):
"""Default temp dir persists files (not cleaned up after call)."""
import os
result = flowgraph_middleware.generate_code()
assert result.output_dir # Should have a temp path
assert os.path.isdir(result.output_dir) # Dir still exists
main = next((f for f in result.files if f.is_main), None)
if main:
assert os.path.exists(os.path.join(result.output_dir, main.filename))

View File

@ -132,6 +132,7 @@ class TestContainerLifecycle:
controlport_port=9090,
enable_perf_counters=True,
device_paths=None,
image=None,
)
def test_launch_flowgraph_auto_name(