gr-mcp/examples/block_dev_demo.py
Ryan Malloy 5db7d71d2b feat: add AI-assisted block development tools
Implements complete workflow for generating GNU Radio blocks from
descriptions:

Block Generation:
- generate_sync_block, generate_basic_block, generate_interp_block,
  generate_decim_block tools for creating different block types
- Template-based code generation with customizable work logic
- Automatic validation via AST parsing and signature checking

Protocol Analysis:
- Parse protocol specifications into structured models
- Generate decoder pipelines matching modulation to demodulator blocks
- Templates for BLE, Zigbee, LoRa, POCSAG, ADS-B protocols

OOT Export:
- Export generated blocks to full OOT module structure
- Generate CMakeLists.txt, block YAML, Python modules
- gr_modtool-compatible output

Dynamic Tool Registration:
- enable_block_dev_mode/disable_block_dev_mode for context management
- Tools only registered when needed (reduces LLM context usage)

Includes comprehensive test coverage and end-to-end demo.
2026-02-09 12:36:54 -07:00

311 lines
13 KiB
Python

#!/usr/bin/env python3
"""End-to-End Demo: AI-Assisted Block Development Workflow
This example demonstrates the complete workflow for developing custom
GNU Radio blocks using gr-mcp's AI-assisted block development tools:
1. Describe a signal processing need
2. Generate block code from the description
3. Validate the generated code
4. Test the block (optionally in Docker)
5. Export to a full OOT module
Run this example with:
python examples/block_dev_demo.py
Or use the MCP tools interactively with Claude:
claude -p "Enable block dev mode and generate a gain block"
"""
import asyncio
import sys
import tempfile
from pathlib import Path
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
from fastmcp import Client
# Import the MCP app
from main import app as mcp_app
async def demo_block_generation():
"""Demonstrate the complete block generation workflow."""
print("=" * 60)
print(" GR-MCP Block Development Demo")
print(" Complete Workflow: Generate -> Validate -> Test -> Export")
print("=" * 60)
print()
async with Client(mcp_app) as client:
# ──────────────────────────────────────────
# Step 1: Enable Block Development Mode
# ──────────────────────────────────────────
print("[Step 1] Enabling block development mode...")
result = await client.call_tool(name="enable_block_dev_mode")
print(f" ✓ Block dev mode enabled")
print(f" ✓ Registered {len(result.data.tools_registered)} tools:")
for tool in result.data.tools_registered[:5]:
print(f" - {tool}")
if len(result.data.tools_registered) > 5:
print(f" ... and {len(result.data.tools_registered) - 5} more")
print()
# ──────────────────────────────────────────
# Step 2: Generate a Simple Gain Block
# ──────────────────────────────────────────
print("[Step 2] Generating a configurable gain block...")
gain_result = await client.call_tool(
name="generate_sync_block",
arguments={
"name": "configurable_gain",
"description": "Multiply input samples by a configurable gain factor",
"inputs": [{"dtype": "float", "vlen": 1}],
"outputs": [{"dtype": "float", "vlen": 1}],
"parameters": [
{"name": "gain", "dtype": "float", "default": 1.0}
],
"work_template": "gain",
},
)
print(f" ✓ Block generated: {gain_result.data.block_name}")
print(f" ✓ Validation: {'PASSED' if gain_result.data.is_valid else 'FAILED'}")
print()
print(" Generated code preview:")
print(" " + "-" * 50)
# Show first 15 lines
for i, line in enumerate(gain_result.data.source_code.split("\n")[:15]):
print(f" {line}")
print(" ...")
print(" " + "-" * 50)
print()
# ──────────────────────────────────────────
# Step 3: Generate a More Complex Block
# ──────────────────────────────────────────
print("[Step 3] Generating a threshold detector block...")
threshold_result = await client.call_tool(
name="generate_sync_block",
arguments={
"name": "threshold_detector",
"description": "Output 1.0 when input exceeds threshold, else 0.0",
"inputs": [{"dtype": "float", "vlen": 1}],
"outputs": [{"dtype": "float", "vlen": 1}],
"parameters": [
{"name": "threshold", "dtype": "float", "default": 0.5},
{"name": "hysteresis", "dtype": "float", "default": 0.1},
],
"work_logic": """
# Threshold with hysteresis
upper = self.threshold + self.hysteresis
lower = self.threshold - self.hysteresis
for i in range(len(input_items[0])):
if input_items[0][i] > upper:
output_items[0][i] = 1.0
elif input_items[0][i] < lower:
output_items[0][i] = 0.0
else:
# Maintain previous state (simplified: use 0)
output_items[0][i] = 0.0
""",
},
)
print(f" ✓ Block generated: {threshold_result.data.block_name}")
print(f" ✓ Validation: {'PASSED' if threshold_result.data.is_valid else 'FAILED'}")
print()
# ──────────────────────────────────────────
# Step 4: Validate Code Independently
# ──────────────────────────────────────────
print("[Step 4] Independent code validation...")
validation = await client.call_tool(
name="validate_block_code",
arguments={"source_code": gain_result.data.source_code},
)
print(f" ✓ Syntax check: {'PASSED' if validation.data.is_valid else 'FAILED'}")
if validation.data.warnings:
for warn in validation.data.warnings:
print(f" ⚠ Warning: {warn}")
print()
# ──────────────────────────────────────────
# Step 5: Generate a Decimating Block
# ──────────────────────────────────────────
print("[Step 5] Generating a decimating block (downsample by 4)...")
decim_result = await client.call_tool(
name="generate_decim_block",
arguments={
"name": "average_decim",
"description": "Decimate by averaging groups of samples",
"inputs": [{"dtype": "float", "vlen": 1}],
"outputs": [{"dtype": "float", "vlen": 1}],
"decimation": 4,
"parameters": [],
"work_logic": "output_items[0][:] = input_items[0].reshape(-1, 4).mean(axis=1)",
},
)
print(f" ✓ Block generated: {decim_result.data.block_name}")
print(f" ✓ Block class: {decim_result.data.block_class}")
print(f" ✓ Decimation factor: 4")
print()
# ──────────────────────────────────────────
# Step 6: Parse a Protocol Specification
# ──────────────────────────────────────────
print("[Step 6] Parsing a protocol specification...")
# Use the protocol analyzer tools directly
from gnuradio_mcp.middlewares.protocol_analyzer import ProtocolAnalyzerMiddleware
from gnuradio_mcp.prompts import get_protocol_template
analyzer = ProtocolAnalyzerMiddleware()
# Get the LoRa template
lora_spec = get_protocol_template("lora")
protocol = analyzer.parse_protocol_spec(lora_spec)
print(f" ✓ Protocol: {protocol.name}")
print(f" ✓ Modulation: {protocol.modulation.scheme}")
print(f" ✓ Bandwidth: {protocol.modulation.bandwidth}")
if protocol.framing:
print(f" ✓ Sync word: {protocol.framing.sync_word}")
print()
# Generate decoder pipeline
pipeline = analyzer.generate_decoder_chain(protocol)
print(f" ✓ Decoder pipeline: {len(pipeline.blocks)} blocks")
for block in pipeline.blocks:
print(f" - {block.block_name} ({block.block_type})")
print()
# ──────────────────────────────────────────
# Step 7: Export to OOT Module Structure
# ──────────────────────────────────────────
print("[Step 7] Exporting block to OOT module structure...")
from gnuradio_mcp.middlewares.oot_exporter import OOTExporterMiddleware
from gnuradio_mcp.models import GeneratedBlockCode, SignatureItem
exporter = OOTExporterMiddleware()
# Create proper GeneratedBlockCode from our result
block_to_export = GeneratedBlockCode(
source_code=gain_result.data.source_code,
block_name="configurable_gain",
block_class="sync_block",
inputs=[SignatureItem(dtype="float", vlen=1)],
outputs=[SignatureItem(dtype="float", vlen=1)],
is_valid=True,
)
with tempfile.TemporaryDirectory() as tmpdir:
module_dir = Path(tmpdir) / "gr-custom"
# Generate OOT skeleton
skeleton_result = exporter.generate_oot_skeleton(
module_name="custom",
output_dir=str(module_dir),
author="GR-MCP Demo",
)
print(f" ✓ Created OOT skeleton: gr-{skeleton_result.module_name}")
# Export the block
export_result = exporter.export_block_to_oot(
generated=block_to_export,
module_name="custom",
output_dir=str(module_dir),
)
print(f" ✓ Exported block: {export_result.block_name}")
print(f" ✓ Files created: {len(export_result.files_created)}")
for f in export_result.files_created:
print(f" - {f}")
# Show directory structure
print()
print(" OOT Module Structure:")
for path in sorted(module_dir.rglob("*")):
if path.is_file():
rel = path.relative_to(module_dir)
print(f" {rel}")
print()
# ──────────────────────────────────────────
# Step 8: Disable Block Dev Mode
# ──────────────────────────────────────────
print("[Step 8] Disabling block development mode...")
await client.call_tool(name="disable_block_dev_mode")
print(" ✓ Block dev mode disabled")
print()
# ──────────────────────────────────────────
# Summary
# ──────────────────────────────────────────
print("=" * 60)
print(" Demo Complete!")
print("=" * 60)
print()
print(" What we demonstrated:")
print(" 1. Dynamic tool registration (enable_block_dev_mode)")
print(" 2. Sync block generation from templates")
print(" 3. Custom work logic specification")
print(" 4. Independent code validation")
print(" 5. Decimating block generation")
print(" 6. Protocol specification parsing (LoRa)")
print(" 7. Decoder pipeline generation")
print(" 8. OOT module export with YAML generation")
print()
print(" Next steps for real-world use:")
print(" - Use 'test_block_in_docker' to test in isolated containers")
print(" - Connect generated blocks to your flowgraph")
print(" - Build the exported OOT module with 'install_oot_module'")
print()
async def demo_protocol_templates():
"""Show available protocol templates."""
print()
print("=" * 60)
print(" Available Protocol Templates")
print("=" * 60)
print()
from gnuradio_mcp.prompts import list_available_protocols, get_protocol_template
from gnuradio_mcp.middlewares.protocol_analyzer import ProtocolAnalyzerMiddleware
analyzer = ProtocolAnalyzerMiddleware()
for proto_name in list_available_protocols():
template = get_protocol_template(proto_name)
protocol = analyzer.parse_protocol_spec(template)
print(f" {proto_name.upper()}")
print(f" Modulation: {protocol.modulation.scheme}")
if protocol.modulation.symbol_rate:
print(f" Symbol rate: {protocol.modulation.symbol_rate:,.0f} sym/s")
if protocol.modulation.bandwidth:
print(f" Bandwidth: {protocol.modulation.bandwidth:,.0f} Hz")
if protocol.framing and protocol.framing.sync_word:
print(f" Sync word: {protocol.framing.sync_word}")
print()
if __name__ == "__main__":
print()
asyncio.run(demo_block_generation())
asyncio.run(demo_protocol_templates())