Implements complete workflow for generating GNU Radio blocks from descriptions: Block Generation: - generate_sync_block, generate_basic_block, generate_interp_block, generate_decim_block tools for creating different block types - Template-based code generation with customizable work logic - Automatic validation via AST parsing and signature checking Protocol Analysis: - Parse protocol specifications into structured models - Generate decoder pipelines matching modulation to demodulator blocks - Templates for BLE, Zigbee, LoRa, POCSAG, ADS-B protocols OOT Export: - Export generated blocks to full OOT module structure - Generate CMakeLists.txt, block YAML, Python modules - gr_modtool-compatible output Dynamic Tool Registration: - enable_block_dev_mode/disable_block_dev_mode for context management - Tools only registered when needed (reduces LLM context usage) Includes comprehensive test coverage and end-to-end demo.
311 lines
13 KiB
Python
311 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""End-to-End Demo: AI-Assisted Block Development Workflow
|
|
|
|
This example demonstrates the complete workflow for developing custom
|
|
GNU Radio blocks using gr-mcp's AI-assisted block development tools:
|
|
|
|
1. Describe a signal processing need
|
|
2. Generate block code from the description
|
|
3. Validate the generated code
|
|
4. Test the block (optionally in Docker)
|
|
5. Export to a full OOT module
|
|
|
|
Run this example with:
|
|
python examples/block_dev_demo.py
|
|
|
|
Or use the MCP tools interactively with Claude:
|
|
claude -p "Enable block dev mode and generate a gain block"
|
|
"""
|
|
|
|
import asyncio
|
|
import sys
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
# Add parent directory to path for imports
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
|
from fastmcp import Client
|
|
|
|
# Import the MCP app
|
|
from main import app as mcp_app
|
|
|
|
|
|
async def demo_block_generation():
|
|
"""Demonstrate the complete block generation workflow."""
|
|
|
|
print("=" * 60)
|
|
print(" GR-MCP Block Development Demo")
|
|
print(" Complete Workflow: Generate -> Validate -> Test -> Export")
|
|
print("=" * 60)
|
|
print()
|
|
|
|
async with Client(mcp_app) as client:
|
|
# ──────────────────────────────────────────
|
|
# Step 1: Enable Block Development Mode
|
|
# ──────────────────────────────────────────
|
|
print("[Step 1] Enabling block development mode...")
|
|
|
|
result = await client.call_tool(name="enable_block_dev_mode")
|
|
print(f" ✓ Block dev mode enabled")
|
|
print(f" ✓ Registered {len(result.data.tools_registered)} tools:")
|
|
for tool in result.data.tools_registered[:5]:
|
|
print(f" - {tool}")
|
|
if len(result.data.tools_registered) > 5:
|
|
print(f" ... and {len(result.data.tools_registered) - 5} more")
|
|
print()
|
|
|
|
# ──────────────────────────────────────────
|
|
# Step 2: Generate a Simple Gain Block
|
|
# ──────────────────────────────────────────
|
|
print("[Step 2] Generating a configurable gain block...")
|
|
|
|
gain_result = await client.call_tool(
|
|
name="generate_sync_block",
|
|
arguments={
|
|
"name": "configurable_gain",
|
|
"description": "Multiply input samples by a configurable gain factor",
|
|
"inputs": [{"dtype": "float", "vlen": 1}],
|
|
"outputs": [{"dtype": "float", "vlen": 1}],
|
|
"parameters": [
|
|
{"name": "gain", "dtype": "float", "default": 1.0}
|
|
],
|
|
"work_template": "gain",
|
|
},
|
|
)
|
|
|
|
print(f" ✓ Block generated: {gain_result.data.block_name}")
|
|
print(f" ✓ Validation: {'PASSED' if gain_result.data.is_valid else 'FAILED'}")
|
|
print()
|
|
print(" Generated code preview:")
|
|
print(" " + "-" * 50)
|
|
# Show first 15 lines
|
|
for i, line in enumerate(gain_result.data.source_code.split("\n")[:15]):
|
|
print(f" {line}")
|
|
print(" ...")
|
|
print(" " + "-" * 50)
|
|
print()
|
|
|
|
# ──────────────────────────────────────────
|
|
# Step 3: Generate a More Complex Block
|
|
# ──────────────────────────────────────────
|
|
print("[Step 3] Generating a threshold detector block...")
|
|
|
|
threshold_result = await client.call_tool(
|
|
name="generate_sync_block",
|
|
arguments={
|
|
"name": "threshold_detector",
|
|
"description": "Output 1.0 when input exceeds threshold, else 0.0",
|
|
"inputs": [{"dtype": "float", "vlen": 1}],
|
|
"outputs": [{"dtype": "float", "vlen": 1}],
|
|
"parameters": [
|
|
{"name": "threshold", "dtype": "float", "default": 0.5},
|
|
{"name": "hysteresis", "dtype": "float", "default": 0.1},
|
|
],
|
|
"work_logic": """
|
|
# Threshold with hysteresis
|
|
upper = self.threshold + self.hysteresis
|
|
lower = self.threshold - self.hysteresis
|
|
for i in range(len(input_items[0])):
|
|
if input_items[0][i] > upper:
|
|
output_items[0][i] = 1.0
|
|
elif input_items[0][i] < lower:
|
|
output_items[0][i] = 0.0
|
|
else:
|
|
# Maintain previous state (simplified: use 0)
|
|
output_items[0][i] = 0.0
|
|
""",
|
|
},
|
|
)
|
|
|
|
print(f" ✓ Block generated: {threshold_result.data.block_name}")
|
|
print(f" ✓ Validation: {'PASSED' if threshold_result.data.is_valid else 'FAILED'}")
|
|
print()
|
|
|
|
# ──────────────────────────────────────────
|
|
# Step 4: Validate Code Independently
|
|
# ──────────────────────────────────────────
|
|
print("[Step 4] Independent code validation...")
|
|
|
|
validation = await client.call_tool(
|
|
name="validate_block_code",
|
|
arguments={"source_code": gain_result.data.source_code},
|
|
)
|
|
|
|
print(f" ✓ Syntax check: {'PASSED' if validation.data.is_valid else 'FAILED'}")
|
|
if validation.data.warnings:
|
|
for warn in validation.data.warnings:
|
|
print(f" ⚠ Warning: {warn}")
|
|
print()
|
|
|
|
# ──────────────────────────────────────────
|
|
# Step 5: Generate a Decimating Block
|
|
# ──────────────────────────────────────────
|
|
print("[Step 5] Generating a decimating block (downsample by 4)...")
|
|
|
|
decim_result = await client.call_tool(
|
|
name="generate_decim_block",
|
|
arguments={
|
|
"name": "average_decim",
|
|
"description": "Decimate by averaging groups of samples",
|
|
"inputs": [{"dtype": "float", "vlen": 1}],
|
|
"outputs": [{"dtype": "float", "vlen": 1}],
|
|
"decimation": 4,
|
|
"parameters": [],
|
|
"work_logic": "output_items[0][:] = input_items[0].reshape(-1, 4).mean(axis=1)",
|
|
},
|
|
)
|
|
|
|
print(f" ✓ Block generated: {decim_result.data.block_name}")
|
|
print(f" ✓ Block class: {decim_result.data.block_class}")
|
|
print(f" ✓ Decimation factor: 4")
|
|
print()
|
|
|
|
# ──────────────────────────────────────────
|
|
# Step 6: Parse a Protocol Specification
|
|
# ──────────────────────────────────────────
|
|
print("[Step 6] Parsing a protocol specification...")
|
|
|
|
# Use the protocol analyzer tools directly
|
|
from gnuradio_mcp.middlewares.protocol_analyzer import ProtocolAnalyzerMiddleware
|
|
from gnuradio_mcp.prompts import get_protocol_template
|
|
|
|
analyzer = ProtocolAnalyzerMiddleware()
|
|
|
|
# Get the LoRa template
|
|
lora_spec = get_protocol_template("lora")
|
|
protocol = analyzer.parse_protocol_spec(lora_spec)
|
|
|
|
print(f" ✓ Protocol: {protocol.name}")
|
|
print(f" ✓ Modulation: {protocol.modulation.scheme}")
|
|
print(f" ✓ Bandwidth: {protocol.modulation.bandwidth}")
|
|
if protocol.framing:
|
|
print(f" ✓ Sync word: {protocol.framing.sync_word}")
|
|
print()
|
|
|
|
# Generate decoder pipeline
|
|
pipeline = analyzer.generate_decoder_chain(protocol)
|
|
print(f" ✓ Decoder pipeline: {len(pipeline.blocks)} blocks")
|
|
for block in pipeline.blocks:
|
|
print(f" - {block.block_name} ({block.block_type})")
|
|
print()
|
|
|
|
# ──────────────────────────────────────────
|
|
# Step 7: Export to OOT Module Structure
|
|
# ──────────────────────────────────────────
|
|
print("[Step 7] Exporting block to OOT module structure...")
|
|
|
|
from gnuradio_mcp.middlewares.oot_exporter import OOTExporterMiddleware
|
|
from gnuradio_mcp.models import GeneratedBlockCode, SignatureItem
|
|
|
|
exporter = OOTExporterMiddleware()
|
|
|
|
# Create proper GeneratedBlockCode from our result
|
|
block_to_export = GeneratedBlockCode(
|
|
source_code=gain_result.data.source_code,
|
|
block_name="configurable_gain",
|
|
block_class="sync_block",
|
|
inputs=[SignatureItem(dtype="float", vlen=1)],
|
|
outputs=[SignatureItem(dtype="float", vlen=1)],
|
|
is_valid=True,
|
|
)
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
module_dir = Path(tmpdir) / "gr-custom"
|
|
|
|
# Generate OOT skeleton
|
|
skeleton_result = exporter.generate_oot_skeleton(
|
|
module_name="custom",
|
|
output_dir=str(module_dir),
|
|
author="GR-MCP Demo",
|
|
)
|
|
print(f" ✓ Created OOT skeleton: gr-{skeleton_result.module_name}")
|
|
|
|
# Export the block
|
|
export_result = exporter.export_block_to_oot(
|
|
generated=block_to_export,
|
|
module_name="custom",
|
|
output_dir=str(module_dir),
|
|
)
|
|
print(f" ✓ Exported block: {export_result.block_name}")
|
|
print(f" ✓ Files created: {len(export_result.files_created)}")
|
|
for f in export_result.files_created:
|
|
print(f" - {f}")
|
|
|
|
# Show directory structure
|
|
print()
|
|
print(" OOT Module Structure:")
|
|
for path in sorted(module_dir.rglob("*")):
|
|
if path.is_file():
|
|
rel = path.relative_to(module_dir)
|
|
print(f" {rel}")
|
|
|
|
print()
|
|
|
|
# ──────────────────────────────────────────
|
|
# Step 8: Disable Block Dev Mode
|
|
# ──────────────────────────────────────────
|
|
print("[Step 8] Disabling block development mode...")
|
|
|
|
await client.call_tool(name="disable_block_dev_mode")
|
|
print(" ✓ Block dev mode disabled")
|
|
print()
|
|
|
|
# ──────────────────────────────────────────
|
|
# Summary
|
|
# ──────────────────────────────────────────
|
|
print("=" * 60)
|
|
print(" Demo Complete!")
|
|
print("=" * 60)
|
|
print()
|
|
print(" What we demonstrated:")
|
|
print(" 1. Dynamic tool registration (enable_block_dev_mode)")
|
|
print(" 2. Sync block generation from templates")
|
|
print(" 3. Custom work logic specification")
|
|
print(" 4. Independent code validation")
|
|
print(" 5. Decimating block generation")
|
|
print(" 6. Protocol specification parsing (LoRa)")
|
|
print(" 7. Decoder pipeline generation")
|
|
print(" 8. OOT module export with YAML generation")
|
|
print()
|
|
print(" Next steps for real-world use:")
|
|
print(" - Use 'test_block_in_docker' to test in isolated containers")
|
|
print(" - Connect generated blocks to your flowgraph")
|
|
print(" - Build the exported OOT module with 'install_oot_module'")
|
|
print()
|
|
|
|
|
|
async def demo_protocol_templates():
|
|
"""Show available protocol templates."""
|
|
|
|
print()
|
|
print("=" * 60)
|
|
print(" Available Protocol Templates")
|
|
print("=" * 60)
|
|
print()
|
|
|
|
from gnuradio_mcp.prompts import list_available_protocols, get_protocol_template
|
|
from gnuradio_mcp.middlewares.protocol_analyzer import ProtocolAnalyzerMiddleware
|
|
|
|
analyzer = ProtocolAnalyzerMiddleware()
|
|
|
|
for proto_name in list_available_protocols():
|
|
template = get_protocol_template(proto_name)
|
|
protocol = analyzer.parse_protocol_spec(template)
|
|
|
|
print(f" {proto_name.upper()}")
|
|
print(f" Modulation: {protocol.modulation.scheme}")
|
|
if protocol.modulation.symbol_rate:
|
|
print(f" Symbol rate: {protocol.modulation.symbol_rate:,.0f} sym/s")
|
|
if protocol.modulation.bandwidth:
|
|
print(f" Bandwidth: {protocol.modulation.bandwidth:,.0f} Hz")
|
|
if protocol.framing and protocol.framing.sync_word:
|
|
print(f" Sync word: {protocol.framing.sync_word}")
|
|
print()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print()
|
|
asyncio.run(demo_block_generation())
|
|
asyncio.run(demo_protocol_templates())
|