diff --git a/src/gnuradio_mcp/middlewares/oot.py b/src/gnuradio_mcp/middlewares/oot.py index 84d3f7d..19b504c 100644 --- a/src/gnuradio_mcp/middlewares/oot.py +++ b/src/gnuradio_mcp/middlewares/oot.py @@ -9,7 +9,15 @@ from datetime import datetime, timezone from pathlib import Path from typing import Any -from gnuradio_mcp.models import ComboImageInfo, ComboImageResult, OOTImageInfo, OOTInstallResult +import re + +from gnuradio_mcp.models import ( + ComboImageInfo, + ComboImageResult, + OOTDetectionResult, + OOTImageInfo, + OOTInstallResult, +) logger = logging.getLogger(__name__) @@ -549,3 +557,206 @@ class OOTInstallerMiddleware: self._combo_registry_path.parent.mkdir(parents=True, exist_ok=True) data = {k: v.model_dump() for k, v in self._combo_registry.items()} self._combo_registry_path.write_text(json.dumps(data, indent=2)) + + # ────────────────────────────────────────── + # OOT Module Detection + # ────────────────────────────────────────── + + # Core GNU Radio module prefixes to ignore during detection + _CORE_BLOCK_PREFIXES = frozenset([ + "blocks_", "analog_", "digital_", "filter_", "qtgui_", "fft_", + "audio_", "channels_", "trellis_", "vocoder_", "video_sdl_", + "dtv_", "fec_", "network_", "pdu_", "soapy_", "uhd_", "zeromq_", + "variable_", # All variable_* blocks (qtgui controls, function probe, etc.) + ]) + _CORE_BLOCK_EXACT = frozenset([ + # Special blocks without prefixes + "variable", # The basic variable block (no underscore) + "import", "snippet", "options", "parameter", + "pad_source", "pad_sink", "virtual_source", "virtual_sink", + "note", "epy_block", "epy_module", + # Core filter blocks (not prefixed with filter_) + "low_pass_filter", "high_pass_filter", "band_pass_filter", + "band_reject_filter", "root_raised_cosine_filter", + # XML-RPC (part of core GR) + "xmlrpc_server", "xmlrpc_client", + ]) + + # Explicit block ID → module mappings for blocks that don't follow + # the standard `module_prefix_` naming convention + _BLOCK_TO_MODULE: dict[str, str] = { + # gr-lora_sdr: main blocks use short names + "lora_rx": "lora_sdr", + "lora_tx": "lora_sdr", + # gr-adsb: decoder blocks + "adsb_decoder": "adsb", + "adsb_framer": "adsb", + # gr-iridium: main blocks + "iridium_extractor": "iridium", + "iridium_frame_sorter": "iridium", + "iridium_qpsk_demod": "iridium", + # gr-rds: decoder/encoder + "rds_decoder": "rds", + "rds_encoder": "rds", + "rds_parser": "rds", + "rds_panel": "rds", + # gr-satellites: common blocks + "satellites_ax25_deframer": "satellites", + "satellites_hdlc_deframer": "satellites", + "satellites_nrzi_decode": "satellites", + # gr-gsm: receiver blocks + "gsm_receiver": "gsm", + "gsm_input": "gsm", + "gsm_clock_offset_control": "gsm", + "gsm_controlled_rotator_cc": "gsm", + } + + def detect_required_modules(self, flowgraph_path: str) -> OOTDetectionResult: + """Detect OOT modules required by a flowgraph. + + For .py files: parse Python imports + For .grc files: heuristic prefix matching against catalog + + Args: + flowgraph_path: Path to a .py or .grc flowgraph file + + Returns: + OOTDetectionResult with detected modules and recommended image + """ + path = Path(flowgraph_path) + + if not path.exists(): + raise FileNotFoundError(f"Flowgraph not found: {flowgraph_path}") + + if path.suffix == ".py": + return self._detect_from_python(path) + elif path.suffix == ".grc": + return self._detect_from_grc(path) + else: + raise ValueError( + f"Unsupported file type: {path.suffix}. " + f"Expected .py or .grc" + ) + + def _detect_from_python(self, path: Path) -> OOTDetectionResult: + """Parse Python imports to find OOT modules.""" + from gnuradio_mcp.oot_catalog import CATALOG + + content = path.read_text() + modules: set[str] = set() + + # Pattern: import gnuradio.MODULE or from gnuradio import MODULE + for match in re.finditer( + r"(?:import gnuradio\.(\w+)|from gnuradio import (\w+))", content + ): + module = match.group(1) or match.group(2) + if module in CATALOG: + modules.add(module) + + # Pattern: import MODULE (top-level OOT like osmosdr) + for match in re.finditer(r"^import (\w+)\s*$", content, re.MULTILINE): + module = match.group(1) + if module in CATALOG: + modules.add(module) + + # Pattern: from MODULE import ... (top-level OOT) + for match in re.finditer(r"^from (\w+) import", content, re.MULTILINE): + module = match.group(1) + if module in CATALOG: + modules.add(module) + + sorted_modules = sorted(modules) + return OOTDetectionResult( + flowgraph_path=str(path), + detected_modules=sorted_modules, + detection_method="python_imports", + recommended_image=self._recommend_image(sorted_modules), + ) + + def _detect_from_grc(self, path: Path) -> OOTDetectionResult: + """Heuristic: match block IDs against catalog module prefixes.""" + import yaml + + from gnuradio_mcp.oot_catalog import CATALOG + + data = yaml.safe_load(path.read_text()) + blocks = data.get("blocks", []) + block_ids = [b.get("id", "") for b in blocks if isinstance(b, dict)] + + modules: set[str] = set() + unknown: list[str] = [] + + for block_id in block_ids: + # Skip empty IDs + if not block_id: + continue + + # Skip core GNU Radio blocks + if block_id in self._CORE_BLOCK_EXACT: + continue + if any(block_id.startswith(prefix) for prefix in self._CORE_BLOCK_PREFIXES): + continue + + # Phase 1: Check explicit block-to-module mapping (handles edge cases) + if block_id in self._BLOCK_TO_MODULE: + module = self._BLOCK_TO_MODULE[block_id] + if module in CATALOG: + modules.add(module) + continue + + # Phase 2: Match against catalog module names as prefixes + matched = False + for module_name in CATALOG: + # Check for prefix match (e.g., "lora_sdr_gray_demap" -> "lora_sdr") + # Also handle cases like "osmosdr_source" -> "osmosdr" + if block_id.startswith(f"{module_name}_") or block_id == module_name: + modules.add(module_name) + matched = True + break + + if not matched and "_" in block_id: + # Looks like an OOT block but not in catalog + # Don't flag all unknown blocks, just those with OOT-like patterns + prefix = block_id.split("_")[0] + # Check it's not a known core prefix that might have been missed + if not any(p.startswith(prefix) for p in self._CORE_BLOCK_PREFIXES): + unknown.append(block_id) + + sorted_modules = sorted(modules) + return OOTDetectionResult( + flowgraph_path=str(path), + detected_modules=sorted_modules, + unknown_blocks=unknown, + detection_method="grc_prefix_heuristic", + recommended_image=self._recommend_image(sorted_modules), + ) + + def _recommend_image(self, modules: list[str]) -> str | None: + """Recommend Docker image for detected modules. + + Args: + modules: Sorted list of module names + + Returns: + - Base runtime image if no OOT modules + - Single OOT image tag if one module (and built) + - Combo image tag if multiple modules + """ + if not modules: + return self._base_image + + if len(modules) == 1: + # Single module - check if already built + info = self._registry.get(modules[0]) + if info: + return info.image_tag + # Not built yet - return what the tag would be + return None + + # Multiple modules - return combo tag (may or may not exist yet) + combo_key = self._combo_key(modules) + combo = self._combo_registry.get(combo_key) + if combo: + return combo.image_tag + # Return what the tag would be + return self._combo_image_tag(modules) diff --git a/src/gnuradio_mcp/models.py b/src/gnuradio_mcp/models.py index d6c412e..73a6553 100644 --- a/src/gnuradio_mcp/models.py +++ b/src/gnuradio_mcp/models.py @@ -411,3 +411,17 @@ class ComboImageResult(BaseModel): error: str | None = None skipped: bool = False # True if combo image already existed modules_built: list[str] = [] # modules auto-built from catalog first + + +class OOTDetectionResult(BaseModel): + """Result of OOT module detection from a flowgraph. + + Analyzes .py or .grc files to identify which OOT modules are required, + enabling automatic Docker image selection for launch_flowgraph(). + """ + + flowgraph_path: str + detected_modules: list[str] # OOT modules found (catalog matches) + unknown_blocks: list[str] = [] # Blocks that look OOT but aren't in catalog + detection_method: str # "python_imports" | "grc_prefix_heuristic" + recommended_image: str | None = None # Image tag to use (if modules found) diff --git a/src/gnuradio_mcp/providers/mcp_runtime.py b/src/gnuradio_mcp/providers/mcp_runtime.py index 7c03c9c..9870cfb 100644 --- a/src/gnuradio_mcp/providers/mcp_runtime.py +++ b/src/gnuradio_mcp/providers/mcp_runtime.py @@ -75,6 +75,9 @@ class McpRuntimeProvider: # OOT module installation if p._has_oot: + # Detection (new!) + self._mcp.tool(p.detect_oot_modules) + # Installation self._mcp.tool(p.install_oot_module) self._mcp.tool(p.list_oot_images) self._mcp.tool(p.remove_oot_image) @@ -82,7 +85,7 @@ class McpRuntimeProvider: self._mcp.tool(p.build_multi_oot_image) self._mcp.tool(p.list_combo_images) self._mcp.tool(p.remove_combo_image) - logger.info("Registered 35 runtime tools (Docker + OOT available)") + logger.info("Registered 36 runtime tools (Docker + OOT available)") else: logger.info("Registered 29 runtime tools (Docker available)") else: diff --git a/src/gnuradio_mcp/providers/runtime.py b/src/gnuradio_mcp/providers/runtime.py index fb43857..8e56203 100644 --- a/src/gnuradio_mcp/providers/runtime.py +++ b/src/gnuradio_mcp/providers/runtime.py @@ -20,6 +20,7 @@ from gnuradio_mcp.models import ( CoverageReportModel, KnobModel, KnobPropertiesModel, + OOTDetectionResult, OOTImageInfo, OOTInstallResult, PerfCounterModel, @@ -109,6 +110,7 @@ class RuntimeProvider: enable_perf_counters: bool = True, device_paths: list[str] | None = None, image: str | None = None, + auto_image: bool = False, ) -> ContainerModel: """Launch a flowgraph in a Docker container with Xvfb. @@ -123,8 +125,17 @@ class RuntimeProvider: enable_perf_counters: Enable performance counters (requires controlport) device_paths: Host device paths to pass through image: Docker image to use (e.g., 'gnuradio-lora-runtime:latest') + auto_image: Automatically detect required OOT modules and build + appropriate Docker image. If True and image is not specified, + analyzes the flowgraph to determine OOT dependencies and + builds a single-OOT or combo image as needed. """ docker = self._require_docker() + + # Auto-detect and build image if requested + if auto_image and image is None and self._has_oot: + image = self._auto_select_image(flowgraph_path) + if name is None: name = f"gr-{Path(flowgraph_path).stem}" return docker.launch( @@ -140,6 +151,50 @@ class RuntimeProvider: image=image, ) + def _auto_select_image(self, flowgraph_path: str) -> str | None: + """Detect OOT modules and build/select appropriate image. + + Auto-builds missing modules from catalog when needed. + """ + from gnuradio_mcp.oot_catalog import CATALOG + + oot = self._require_oot() + detection = oot.detect_required_modules(flowgraph_path) + + if not detection.detected_modules: + logger.info("No OOT modules detected, using base runtime image") + return detection.recommended_image + + modules = detection.detected_modules + logger.info("Detected OOT modules: %s", modules) + + if len(modules) == 1: + # Single module - ensure it's built + module = modules[0] + if module not in oot._registry: + entry = CATALOG.get(module) + if entry: + logger.info("Auto-building module '%s' from catalog", module) + result = oot.build_module( + git_url=entry.git_url, + branch=entry.branch, + build_deps=entry.build_deps or None, + cmake_args=entry.cmake_args or None, + ) + if not result.success: + logger.error("Auto-build of '%s' failed: %s", module, result.error) + return None + info = oot._registry.get(module) + return info.image_tag if info else None + else: + # Multiple modules - build combo + logger.info("Building combo image for modules: %s", modules) + result = oot.build_combo_image(modules) + if result.success and result.image: + return result.image.image_tag + logger.error("Combo image build failed: %s", result.error) + return None + def list_containers(self) -> list[ContainerModel]: """List all gr-mcp managed containers.""" docker = self._require_docker() @@ -683,9 +738,37 @@ class RuntimeProvider: return deleted # ────────────────────────────────────────── - # OOT Module Installation + # OOT Module Detection & Installation # ────────────────────────────────────────── + def detect_oot_modules(self, flowgraph_path: str) -> OOTDetectionResult: + """Detect which OOT modules a flowgraph requires. + + Analyzes .py or .grc files to find OOT module dependencies. + Returns recommended Docker image to use with launch_flowgraph(). + + For .py files: parses Python imports (most accurate) + For .grc files: uses heuristic prefix matching against the + OOT catalog (fast, no Docker required) + + Args: + flowgraph_path: Path to a .py or .grc flowgraph file + + Returns: + OOTDetectionResult with detected modules, unknown blocks, + and recommended image tag. + + Example: + result = detect_oot_modules("lora_rx.grc") + # -> detected_modules=["lora_sdr", "osmosdr"] + # -> recommended_image="gr-combo-lora_sdr-osmosdr:latest" + + # Then launch with auto-built image: + launch_flowgraph("lora_rx.py", auto_image=True) + """ + oot = self._require_oot() + return oot.detect_required_modules(flowgraph_path) + def install_oot_module( self, git_url: str, diff --git a/tests/unit/test_oot_installer.py b/tests/unit/test_oot_installer.py index 96b5865..b757041 100644 --- a/tests/unit/test_oot_installer.py +++ b/tests/unit/test_oot_installer.py @@ -1,7 +1,7 @@ """Unit tests for OOT module installer middleware. Tests Dockerfile generation, module name extraction, registry persistence, -and image naming — all without requiring Docker. +image naming, and OOT module detection — all without requiring Docker. """ import json @@ -11,7 +11,7 @@ from unittest.mock import MagicMock import pytest from gnuradio_mcp.middlewares.oot import OOTInstallerMiddleware -from gnuradio_mcp.models import ComboImageInfo, OOTImageInfo +from gnuradio_mcp.models import ComboImageInfo, OOTDetectionResult, OOTImageInfo @pytest.fixture @@ -592,3 +592,256 @@ class TestBuildComboImage: result = oot.build_combo_image(["adsb", "lora_sdr"], force=True) assert result.success is True assert result.skipped is False + + +# ────────────────────────────────────────── +# OOT Detection from Python Files +# ────────────────────────────────────────── + + +class TestDetectFromPython: + def test_detects_gnuradio_import(self, oot, tmp_path): + """Detects 'import gnuradio.MODULE' pattern.""" + py_file = tmp_path / "test.py" + py_file.write_text( + "import gnuradio.lora_sdr as lora_sdr\n" + "from gnuradio import blocks\n" + ) + result = oot.detect_required_modules(str(py_file)) + assert result.detected_modules == ["lora_sdr"] + assert result.detection_method == "python_imports" + + def test_detects_from_gnuradio_import(self, oot, tmp_path): + """Detects 'from gnuradio import MODULE' pattern.""" + py_file = tmp_path / "test.py" + py_file.write_text("from gnuradio import adsb, blocks, analog\n") + result = oot.detect_required_modules(str(py_file)) + assert result.detected_modules == ["adsb"] + + def test_detects_multiple_modules(self, oot, tmp_path): + """Detects multiple OOT modules in one file.""" + py_file = tmp_path / "test.py" + py_file.write_text( + "import gnuradio.lora_sdr as lora_sdr\n" + "from gnuradio import adsb\n" + ) + result = oot.detect_required_modules(str(py_file)) + assert result.detected_modules == ["adsb", "lora_sdr"] + + def test_detects_toplevel_osmosdr(self, oot, tmp_path): + """Detects top-level 'import osmosdr' (not in gnuradio namespace).""" + py_file = tmp_path / "test.py" + py_file.write_text("import osmosdr\n") + result = oot.detect_required_modules(str(py_file)) + assert "osmosdr" in result.detected_modules + + def test_detects_from_toplevel_import(self, oot, tmp_path): + """Detects 'from MODULE import ...' for top-level OOT.""" + py_file = tmp_path / "test.py" + py_file.write_text("from osmosdr import source\n") + result = oot.detect_required_modules(str(py_file)) + assert "osmosdr" in result.detected_modules + + def test_ignores_core_modules(self, oot, tmp_path): + """Ignores core GNU Radio modules not in catalog.""" + py_file = tmp_path / "test.py" + py_file.write_text( + "from gnuradio import blocks, analog, gr, digital, filter\n" + "import numpy\n" + ) + result = oot.detect_required_modules(str(py_file)) + assert result.detected_modules == [] + + def test_no_oot_returns_empty(self, oot, tmp_path): + """Returns empty list when no OOT imports found.""" + py_file = tmp_path / "test.py" + py_file.write_text("from gnuradio import blocks\nprint('hello')\n") + result = oot.detect_required_modules(str(py_file)) + assert result.detected_modules == [] + + +# ────────────────────────────────────────── +# OOT Detection from GRC Files +# ────────────────────────────────────────── + + +class TestDetectFromGrc: + def test_detects_prefixed_blocks(self, oot, tmp_path): + """Detects blocks with OOT module prefix.""" + grc_file = tmp_path / "test.grc" + grc_file.write_text( + """ +blocks: + - id: lora_sdr_gray_demap + name: gray0 + - id: adsb_demod + name: demod0 + - id: blocks_null_sink + name: null0 +""" + ) + result = oot.detect_required_modules(str(grc_file)) + assert result.detected_modules == ["adsb", "lora_sdr"] + assert result.detection_method == "grc_prefix_heuristic" + + def test_detects_exact_match_blocks(self, oot, tmp_path): + """Detects blocks that exactly match module name.""" + grc_file = tmp_path / "test.grc" + grc_file.write_text( + """ +blocks: + - id: osmosdr_source + name: src0 +""" + ) + result = oot.detect_required_modules(str(grc_file)) + assert "osmosdr" in result.detected_modules + + def test_reports_unknown_blocks(self, oot, tmp_path): + """Reports blocks that look OOT but aren't in catalog.""" + grc_file = tmp_path / "test.grc" + grc_file.write_text( + """ +blocks: + - id: unknown_module_block + name: blk0 + - id: another_weird_thing + name: blk1 +""" + ) + result = oot.detect_required_modules(str(grc_file)) + assert "unknown_module_block" in result.unknown_blocks + assert "another_weird_thing" in result.unknown_blocks + + def test_ignores_core_blocks(self, oot, tmp_path): + """Ignores core GNU Radio blocks.""" + grc_file = tmp_path / "test.grc" + grc_file.write_text( + """ +blocks: + - id: blocks_null_sink + name: null0 + - id: analog_sig_source_x + name: src0 + - id: digital_constellation_decoder_cb + name: dec0 + - id: qtgui_time_sink_x + name: time0 + - id: filter_fir_filter_xxx + name: fir0 +""" + ) + result = oot.detect_required_modules(str(grc_file)) + assert result.detected_modules == [] + assert result.unknown_blocks == [] + + def test_ignores_special_blocks(self, oot, tmp_path): + """Ignores variables, imports, and other special blocks.""" + grc_file = tmp_path / "test.grc" + grc_file.write_text( + """ +blocks: + - id: variable + name: samp_rate + - id: variable_qtgui_range + name: freq + - id: import + name: import_0 + - id: options + name: opts + - id: pad_source + name: in0 + - id: virtual_sink + name: vsink0 +""" + ) + result = oot.detect_required_modules(str(grc_file)) + assert result.detected_modules == [] + assert result.unknown_blocks == [] + + def test_handles_empty_blocks(self, oot, tmp_path): + """Handles GRC with no blocks.""" + grc_file = tmp_path / "test.grc" + grc_file.write_text("blocks: []\n") + result = oot.detect_required_modules(str(grc_file)) + assert result.detected_modules == [] + + def test_handles_missing_blocks_key(self, oot, tmp_path): + """Handles GRC without blocks key.""" + grc_file = tmp_path / "test.grc" + grc_file.write_text("options:\n id: test\n") + result = oot.detect_required_modules(str(grc_file)) + assert result.detected_modules == [] + + +# ────────────────────────────────────────── +# Image Recommendation Logic +# ────────────────────────────────────────── + + +class TestRecommendImage: + def test_recommends_base_for_no_modules(self, oot): + """Returns base runtime image when no OOT modules.""" + result = oot._recommend_image([]) + assert result == "gnuradio-runtime:latest" + + def test_recommends_single_oot_image_if_built(self, oot): + """Returns single OOT image tag if already built.""" + oot._registry["lora_sdr"] = _make_oot_info( + "lora_sdr", "gr-oot-lora_sdr:master-abc1234" + ) + result = oot._recommend_image(["lora_sdr"]) + assert result == "gr-oot-lora_sdr:master-abc1234" + + def test_returns_none_if_single_not_built(self, oot): + """Returns None if single module not yet built.""" + result = oot._recommend_image(["lora_sdr"]) + assert result is None + + def test_recommends_combo_tag_if_exists(self, oot): + """Returns existing combo image tag.""" + combo_info = ComboImageInfo( + combo_key="combo:adsb+lora_sdr", + image_tag="gr-combo-adsb-lora_sdr:latest", + modules=[], + built_at="2025-01-01T00:00:00+00:00", + ) + oot._combo_registry["combo:adsb+lora_sdr"] = combo_info + result = oot._recommend_image(["adsb", "lora_sdr"]) + assert result == "gr-combo-adsb-lora_sdr:latest" + + def test_recommends_combo_tag_format_if_not_built(self, oot): + """Returns what combo tag would be if not yet built.""" + result = oot._recommend_image(["adsb", "lora_sdr"]) + assert result == "gr-combo-adsb-lora_sdr:latest" + + def test_combo_tag_sorted_alphabetically(self, oot): + """Combo tag always uses sorted module names.""" + result = oot._recommend_image(["lora_sdr", "adsb"]) + assert result == "gr-combo-adsb-lora_sdr:latest" + + +# ────────────────────────────────────────── +# Edge Cases and Error Handling +# ────────────────────────────────────────── + + +class TestDetectionEdgeCases: + def test_file_not_found(self, oot, tmp_path): + """Raises FileNotFoundError for missing file.""" + with pytest.raises(FileNotFoundError, match="Flowgraph not found"): + oot.detect_required_modules(str(tmp_path / "does_not_exist.py")) + + def test_unsupported_extension(self, oot, tmp_path): + """Raises ValueError for unsupported file type.""" + txt_file = tmp_path / "test.txt" + txt_file.write_text("some content") + with pytest.raises(ValueError, match="Unsupported file type"): + oot.detect_required_modules(str(txt_file)) + + def test_result_includes_flowgraph_path(self, oot, tmp_path): + """Result includes the analyzed path.""" + py_file = tmp_path / "my_flowgraph.py" + py_file.write_text("from gnuradio import blocks\n") + result = oot.detect_required_modules(str(py_file)) + assert str(py_file) in result.flowgraph_path