Implement the transmit/generate side as streaming GNU Radio blocks, complementing the existing receive chain. Each block maps to a physical instrument on CuriousMarc's Keysight bench: pcm_frame_source - PCM bit stream generator (sync_block + FrameSourceEngine) nrz_encoder - bits to NRZ waveform (+1/-1) with upsampling bpsk_subcarrier_mod - NRZ x cos(1.024 MHz) BPSK modulator fm_voice_subcarrier_mod - 1.25 MHz FM test tone source pm_mod - phase modulator: exp(j * deviation * input) usb_signal_source - convenience wrapper wiring all blocks together Includes GRC YAML definitions for all blocks under [Apollo USB] category, 49 new tests (271 total, all passing), and a loopback test that validates the full TX->RX round trip including frame recovery with 30 dB AWGN.
136 lines
4.5 KiB
Python
136 lines
4.5 KiB
Python
"""Tests for the NRZ encoder block."""
|
|
|
|
import numpy as np
|
|
import pytest
|
|
|
|
try:
|
|
from gnuradio import blocks, gr
|
|
|
|
HAS_GNURADIO = True
|
|
except ImportError:
|
|
HAS_GNURADIO = False
|
|
|
|
from apollo.constants import PCM_HIGH_BIT_RATE, SAMPLE_RATE_BASEBAND
|
|
|
|
pytestmark = pytest.mark.skipif(not HAS_GNURADIO, reason="GNU Radio not installed")
|
|
|
|
|
|
class TestNRZEncoder:
|
|
"""Test NRZ encoding of bit streams to baseband waveforms."""
|
|
|
|
def test_bit_one_maps_to_positive(self):
|
|
"""A single 1-bit should produce +1.0 repeated for samples_per_bit."""
|
|
from apollo.nrz_encoder import nrz_encoder
|
|
|
|
tb = gr.top_block()
|
|
samples_per_bit = int(SAMPLE_RATE_BASEBAND / PCM_HIGH_BIT_RATE)
|
|
|
|
src = blocks.vector_source_b([1])
|
|
enc = nrz_encoder(bit_rate=PCM_HIGH_BIT_RATE, sample_rate=SAMPLE_RATE_BASEBAND)
|
|
snk = blocks.vector_sink_f()
|
|
|
|
tb.connect(src, enc, snk)
|
|
tb.run()
|
|
|
|
output = np.array(snk.data())
|
|
assert len(output) == samples_per_bit
|
|
np.testing.assert_allclose(output, 1.0, atol=1e-6)
|
|
|
|
def test_bit_zero_maps_to_negative(self):
|
|
"""A single 0-bit should produce -1.0 repeated for samples_per_bit."""
|
|
from apollo.nrz_encoder import nrz_encoder
|
|
|
|
tb = gr.top_block()
|
|
samples_per_bit = int(SAMPLE_RATE_BASEBAND / PCM_HIGH_BIT_RATE)
|
|
|
|
src = blocks.vector_source_b([0])
|
|
enc = nrz_encoder(bit_rate=PCM_HIGH_BIT_RATE, sample_rate=SAMPLE_RATE_BASEBAND)
|
|
snk = blocks.vector_sink_f()
|
|
|
|
tb.connect(src, enc, snk)
|
|
tb.run()
|
|
|
|
output = np.array(snk.data())
|
|
assert len(output) == samples_per_bit
|
|
np.testing.assert_allclose(output, -1.0, atol=1e-6)
|
|
|
|
def test_alternating_bits(self):
|
|
"""Alternating [1,0,1,0] should produce +1*N, -1*N, +1*N, -1*N."""
|
|
from apollo.nrz_encoder import nrz_encoder
|
|
|
|
tb = gr.top_block()
|
|
samples_per_bit = int(SAMPLE_RATE_BASEBAND / PCM_HIGH_BIT_RATE)
|
|
bits = [1, 0, 1, 0]
|
|
|
|
src = blocks.vector_source_b(bits)
|
|
enc = nrz_encoder(bit_rate=PCM_HIGH_BIT_RATE, sample_rate=SAMPLE_RATE_BASEBAND)
|
|
snk = blocks.vector_sink_f()
|
|
|
|
tb.connect(src, enc, snk)
|
|
tb.run()
|
|
|
|
output = np.array(snk.data())
|
|
expected_levels = [1.0, -1.0, 1.0, -1.0]
|
|
|
|
for i, level in enumerate(expected_levels):
|
|
start = i * samples_per_bit
|
|
end = (i + 1) * samples_per_bit
|
|
segment = output[start:end]
|
|
np.testing.assert_allclose(
|
|
segment, level, atol=1e-6,
|
|
err_msg=f"Bit {i} (value {bits[i]}): expected {level}",
|
|
)
|
|
|
|
def test_output_length(self):
|
|
"""4 bits at 51200/5120000 (100 samp/bit) should produce 400 samples."""
|
|
from apollo.nrz_encoder import nrz_encoder
|
|
|
|
tb = gr.top_block()
|
|
n_bits = 4
|
|
samples_per_bit = int(SAMPLE_RATE_BASEBAND / PCM_HIGH_BIT_RATE) # 100
|
|
|
|
src = blocks.vector_source_b([1, 0, 1, 1])
|
|
enc = nrz_encoder(bit_rate=PCM_HIGH_BIT_RATE, sample_rate=SAMPLE_RATE_BASEBAND)
|
|
snk = blocks.vector_sink_f()
|
|
|
|
tb.connect(src, enc, snk)
|
|
tb.run()
|
|
|
|
output = np.array(snk.data())
|
|
assert len(output) == n_bits * samples_per_bit
|
|
|
|
def test_upsampling_ratio(self):
|
|
"""Each NRZ level should be held for exactly samples_per_bit samples."""
|
|
from apollo.nrz_encoder import nrz_encoder
|
|
|
|
tb = gr.top_block()
|
|
# Use a different rate pair to verify generality: 1600 bps at 5.12 MHz
|
|
# gives 3200 samples per bit
|
|
bit_rate = 1600
|
|
sample_rate = SAMPLE_RATE_BASEBAND
|
|
samples_per_bit = int(sample_rate / bit_rate) # 3200
|
|
bits = [1, 0]
|
|
|
|
src = blocks.vector_source_b(bits)
|
|
enc = nrz_encoder(bit_rate=bit_rate, sample_rate=sample_rate)
|
|
snk = blocks.vector_sink_f()
|
|
|
|
tb.connect(src, enc, snk)
|
|
tb.run()
|
|
|
|
output = np.array(snk.data())
|
|
assert len(output) == len(bits) * samples_per_bit
|
|
|
|
# First bit (1) -> +1.0 held for samples_per_bit
|
|
np.testing.assert_allclose(output[:samples_per_bit], 1.0, atol=1e-6)
|
|
# Second bit (0) -> -1.0 held for samples_per_bit
|
|
np.testing.assert_allclose(output[samples_per_bit:], -1.0, atol=1e-6)
|
|
|
|
def test_block_instantiation(self):
|
|
"""Block should instantiate with default parameters."""
|
|
from apollo.nrz_encoder import nrz_encoder
|
|
|
|
enc = nrz_encoder()
|
|
assert enc is not None
|
|
assert enc.samples_per_bit == int(SAMPLE_RATE_BASEBAND / PCM_HIGH_BIT_RATE)
|