gr-mcp/src/gnuradio_mcp/prompts/common_patterns.py
Ryan Malloy 5db7d71d2b feat: add AI-assisted block development tools
Implements complete workflow for generating GNU Radio blocks from
descriptions:

Block Generation:
- generate_sync_block, generate_basic_block, generate_interp_block,
  generate_decim_block tools for creating different block types
- Template-based code generation with customizable work logic
- Automatic validation via AST parsing and signature checking

Protocol Analysis:
- Parse protocol specifications into structured models
- Generate decoder pipelines matching modulation to demodulator blocks
- Templates for BLE, Zigbee, LoRa, POCSAG, ADS-B protocols

OOT Export:
- Export generated blocks to full OOT module structure
- Generate CMakeLists.txt, block YAML, Python modules
- gr_modtool-compatible output

Dynamic Tool Registration:
- enable_block_dev_mode/disable_block_dev_mode for context management
- Tools only registered when needed (reduces LLM context usage)

Includes comprehensive test coverage and end-to-end demo.
2026-02-09 12:36:54 -07:00

323 lines
8.9 KiB
Python

"""Common DSP patterns for block generation."""
COMMON_PATTERNS_PROMPT = '''
# Common GNU Radio DSP Patterns
## Signal Processing Primitives
### Moving Average Filter
```python
def __init__(self, window_size=16):
gr.sync_block.__init__(self, ...)
self.window_size = window_size
self._buffer = []
def work(self, input_items, output_items):
for i in range(len(input_items[0])):
self._buffer.append(input_items[0][i])
if len(self._buffer) > self.window_size:
self._buffer.pop(0)
output_items[0][i] = numpy.mean(self._buffer)
return len(output_items[0])
```
### Exponential Moving Average (IIR)
```python
def __init__(self, alpha=0.1):
gr.sync_block.__init__(self, ...)
self.alpha = alpha
self._state = 0.0
def work(self, input_items, output_items):
for i in range(len(input_items[0])):
self._state = self.alpha * input_items[0][i] + (1 - self.alpha) * self._state
output_items[0][i] = self._state
return len(output_items[0])
```
### Peak Detector
```python
def __init__(self, threshold=0.5, decay=0.99):
gr.sync_block.__init__(self, ...)
self.threshold = threshold
self.decay = decay
self._peak = 0.0
def work(self, input_items, output_items):
for i in range(len(input_items[0])):
sample = numpy.abs(input_items[0][i])
if sample > self._peak:
self._peak = sample
else:
self._peak *= self.decay
output_items[0][i] = 1.0 if sample > self._peak * self.threshold else 0.0
return len(output_items[0])
```
### Automatic Gain Control (AGC)
```python
def __init__(self, target=1.0, attack=0.01, decay=0.001):
gr.sync_block.__init__(self,
in_sig=[numpy.complex64],
out_sig=[numpy.complex64], ...)
self.target = target
self.attack = attack
self.decay = decay
self._gain = 1.0
def work(self, input_items, output_items):
for i in range(len(input_items[0])):
sample = input_items[0][i]
mag = numpy.abs(sample)
error = self.target - mag * self._gain
if error > 0:
self._gain += self.attack * error
else:
self._gain += self.decay * error
self._gain = max(0.001, min(1000, self._gain))
output_items[0][i] = sample * self._gain
return len(output_items[0])
```
## Frequency Domain Operations
### Simple FFT Magnitude
```python
def __init__(self, fft_size=1024):
gr.sync_block.__init__(self,
in_sig=[(numpy.complex64, fft_size)],
out_sig=[(numpy.float32, fft_size)], ...)
self.fft_size = fft_size
def work(self, input_items, output_items):
for i in range(len(input_items[0])):
spectrum = numpy.fft.fftshift(numpy.fft.fft(input_items[0][i]))
output_items[0][i] = numpy.abs(spectrum)
return len(output_items[0])
```
### Power Spectral Density
```python
def __init__(self, fft_size=1024, avg_count=10):
gr.sync_block.__init__(self,
in_sig=[(numpy.complex64, fft_size)],
out_sig=[(numpy.float32, fft_size)], ...)
self.fft_size = fft_size
self.avg_count = avg_count
self._psd_sum = numpy.zeros(fft_size)
self._count = 0
def work(self, input_items, output_items):
for i in range(len(input_items[0])):
spectrum = numpy.fft.fftshift(numpy.fft.fft(input_items[0][i]))
psd = numpy.abs(spectrum) ** 2
self._psd_sum += psd
self._count += 1
if self._count >= self.avg_count:
output_items[0][i] = 10 * numpy.log10(self._psd_sum / self._count + 1e-10)
self._psd_sum = numpy.zeros(self.fft_size)
self._count = 0
else:
output_items[0][i] = numpy.zeros(self.fft_size)
return len(output_items[0])
```
## Timing and Synchronization
### Simple Clock Recovery (Zerocrossing)
```python
def __init__(self, samples_per_symbol=8):
gr.basic_block.__init__(self,
in_sig=[numpy.float32],
out_sig=[numpy.float32], ...)
self.sps = samples_per_symbol
self._phase = 0.0
self._last_sample = 0.0
def forecast(self, noutput_items, ninputs):
return [int(noutput_items * self.sps) + 1]
def general_work(self, input_items, output_items):
data = input_items[0]
out = output_items[0]
n_out = 0
n_in = 0
while n_out < len(out) and n_in < len(data) - 1:
# Detect zero crossing for timing adjustment
if data[n_in] * self._last_sample < 0:
# Adjust phase based on crossing position
cross_pos = -self._last_sample / (data[n_in] - self._last_sample)
self._phase += 0.1 * (cross_pos - 0.5)
# Output at symbol center
self._phase += 1.0 / self.sps
if self._phase >= 1.0:
self._phase -= 1.0
out[n_out] = data[n_in]
n_out += 1
self._last_sample = data[n_in]
n_in += 1
self.consume_each(n_in)
return n_out
```
## Packet Detection
### Preamble Correlator
```python
def __init__(self, preamble="10101010"):
gr.sync_block.__init__(self,
in_sig=[numpy.float32],
out_sig=[numpy.float32], ...)
self.preamble = numpy.array([1 if b == '1' else -1 for b in preamble], dtype=numpy.float32)
self._buffer = numpy.zeros(len(preamble))
def work(self, input_items, output_items):
for i in range(len(input_items[0])):
self._buffer = numpy.roll(self._buffer, -1)
self._buffer[-1] = 1.0 if input_items[0][i] > 0 else -1.0
output_items[0][i] = numpy.dot(self._buffer, self.preamble) / len(self.preamble)
return len(output_items[0])
```
### Threshold with Hysteresis
```python
def __init__(self, high_thresh=0.7, low_thresh=0.3):
gr.sync_block.__init__(self, ...)
self.high_thresh = high_thresh
self.low_thresh = low_thresh
self._state = False
def work(self, input_items, output_items):
for i in range(len(input_items[0])):
sample = input_items[0][i]
if self._state:
if sample < self.low_thresh:
self._state = False
else:
if sample > self.high_thresh:
self._state = True
output_items[0][i] = 1.0 if self._state else 0.0
return len(output_items[0])
```
## Data Transformation
### Byte Unpacker (1 byte → 8 bits)
```python
def __init__(self):
gr.basic_block.__init__(self,
in_sig=[numpy.uint8],
out_sig=[numpy.uint8], ...)
def forecast(self, noutput_items, ninputs):
return [(noutput_items + 7) // 8]
def general_work(self, input_items, output_items):
n_bytes = min(len(input_items[0]), len(output_items[0]) // 8)
if n_bytes == 0:
return 0
for i in range(n_bytes):
byte = input_items[0][i]
for bit in range(8):
output_items[0][i * 8 + bit] = (byte >> (7 - bit)) & 1
self.consume_each(n_bytes)
return n_bytes * 8
```
### Byte Packer (8 bits → 1 byte)
```python
def __init__(self):
gr.basic_block.__init__(self,
in_sig=[numpy.uint8],
out_sig=[numpy.uint8], ...)
def forecast(self, noutput_items, ninputs):
return [noutput_items * 8]
def general_work(self, input_items, output_items):
n_bits = len(input_items[0])
n_bytes = min(n_bits // 8, len(output_items[0]))
if n_bytes == 0:
return 0
for i in range(n_bytes):
byte = 0
for bit in range(8):
if input_items[0][i * 8 + bit]:
byte |= (1 << (7 - bit))
output_items[0][i] = byte
self.consume_each(n_bytes * 8)
return n_bytes
```
### Manchester Decoder
```python
def __init__(self):
gr.basic_block.__init__(self,
in_sig=[numpy.uint8], # Binary symbols
out_sig=[numpy.uint8], ...)
def forecast(self, noutput_items, ninputs):
return [noutput_items * 2]
def general_work(self, input_items, output_items):
data = input_items[0]
n_pairs = min(len(data) // 2, len(output_items[0]))
if n_pairs == 0:
return 0
for i in range(n_pairs):
first = data[i * 2]
second = data[i * 2 + 1]
# Manchester: 0→1 = 0, 1→0 = 1
if first == 0 and second == 1:
output_items[0][i] = 0
elif first == 1 and second == 0:
output_items[0][i] = 1
else:
# Invalid Manchester encoding
output_items[0][i] = 255
self.consume_each(n_pairs * 2)
return n_pairs
```
## Performance Tips
1. **Vectorize with numpy** - Avoid Python loops where possible
```python
# Slow
for i in range(len(data)):
output[i] = data[i] * gain
# Fast
output[:] = data * gain
```
2. **Pre-allocate buffers** - Create arrays once in __init__
```python
def __init__(self):
self._buffer = numpy.zeros(1024)
```
3. **Use in-place operations** - Modify arrays without copies
```python
data *= gain # In-place
data = data * gain # Creates copy
```
4. **Minimize state** - Less state = better cache performance
5. **Profile first** - Use `%timeit` or cProfile to find bottlenecks
'''