Add PRN ranging system: code generator, modulator, demodulator, and demo

Implements the Apollo composite PRN ranging code (5,456,682 chips) from
five component sequences (CL, X, A, B, C) combined via majority-vote
logic, matching Ken Shirriff's Teensy rangeGenerator.ino bit-for-bit.

LFSR taps corrected to produce maximal-length sequences:
  A: 5-bit, taps [2,0] (x^5+x^2+1, period 31)
  B: 6-bit, taps [1,0] (x^6+x+1, period 63)
  C: 7-bit, taps [1,0] (x^7+x+1, period 127)

New files:
  src/apollo/ranging.py          -- pure-Python code generator and correlator
  src/apollo/ranging_source.py   -- GR sync_block streaming PRN chips
  src/apollo/ranging_mod.py      -- GR hier_block2 NRZ chip modulator
  src/apollo/ranging_demod.py    -- GR basic_block FFT-based range correlator
  grc/apollo_ranging_*.block.yml -- GRC block definitions (3 files)
  examples/ranging_demo.py       -- standalone demo with delay simulation
This commit is contained in:
Ryan Malloy 2026-02-24 14:21:02 -07:00
parent 04a17899fc
commit 86a5b08e9d
8 changed files with 1039 additions and 0 deletions

204
examples/ranging_demo.py Normal file
View File

@ -0,0 +1,204 @@
#!/usr/bin/env python3
"""
Apollo PRN Ranging Demo -- generate, delay, correlate, measure.
Demonstrates the Apollo ranging system by:
1. Verifying component code properties (lengths, periodicity, balance)
2. Generating the PRN ranging code
3. NRZ encoding to a bipolar waveform
4. Applying a known propagation delay (simulating spacecraft distance)
5. Optionally adding AWGN noise
6. Cross-correlating to recover the delay
7. Comparing measured range to true range
The demo works at chip rate (1 sample per chip) for simplicity and speed.
No GNU Radio runtime is required.
Usage:
uv run python examples/ranging_demo.py
uv run python examples/ranging_demo.py --range-km 100
uv run python examples/ranging_demo.py --range-km 384400 # Moon distance
uv run python examples/ranging_demo.py --snr 20
uv run python examples/ranging_demo.py --chips 200000
"""
import argparse
import time
import numpy as np
from apollo.ranging import (
RANGING_A_LENGTH,
RANGING_B_LENGTH,
RANGING_C_LENGTH,
RANGING_CHIP_RATE_HZ,
RANGING_CL_LENGTH,
RANGING_X_LENGTH,
SPEED_OF_LIGHT_M_S,
RangingCodeGenerator,
RangingCorrelator,
range_m_to_chips,
verify_code_properties,
)
def main():
parser = argparse.ArgumentParser(description="Apollo PRN ranging demo")
parser.add_argument(
"--range-km",
type=float,
default=100.0,
help="Target range in km (default: 100)",
)
parser.add_argument(
"--snr",
type=float,
default=None,
help="SNR in dB (default: no noise)",
)
parser.add_argument(
"--chips",
type=int,
default=50_000,
help="Number of chips for correlation (default: 50000)",
)
args = parser.parse_args()
print("=" * 60)
print("Apollo PRN Ranging Demo")
print("=" * 60)
# ------------------------------------------------------------------
# Step 1: Verify code properties
# ------------------------------------------------------------------
print()
print("1. Component code verification")
print("-" * 40)
props = verify_code_properties()
for name in ("cl", "x", "a", "b", "c"):
p = props[name]
status = "OK" if (p["length_correct"] and p["periodic"]) else "FAIL"
balance = ""
if "balance_correct" in p:
balance = f", balance={'OK' if p['balance_correct'] else 'FAIL'}"
print(
f" {name.upper():>2}: length={p['length']:>3} "
f"(1s={p['ones_count']}, 0s={p['zeros_count']}), "
f"periodic={p['periodic']}{balance} [{status}]"
)
lp = props["length_product"]
print(
f" Code length: {lp['expected']:,} "
f"= {RANGING_CL_LENGTH}*{RANGING_X_LENGTH}*{RANGING_A_LENGTH}"
f"*{RANGING_B_LENGTH}*{RANGING_C_LENGTH} "
f"[{'OK' if lp['matches_constant'] else 'FAIL'}]"
)
cs = props["composite_sample"]
print(f" Composite sample ({cs['length']:,} chips): balance={cs['balance']:.3f} (ideal ~0.5)")
# ------------------------------------------------------------------
# Step 2: Generate PRN code
# ------------------------------------------------------------------
print()
print(f"2. Generating {args.chips:,} PRN chips...")
t0 = time.time()
gen = RangingCodeGenerator()
code = gen.generate_sequence(n_chips=args.chips)
t_gen = time.time() - t0
print(f" Generated in {t_gen * 1000:.1f} ms")
print(
f" Ones: {int(np.sum(code)):,} / {args.chips:,} ({100 * np.sum(code) / args.chips:.1f}%)"
)
# ------------------------------------------------------------------
# Step 3: NRZ encode
# ------------------------------------------------------------------
nrz = code.astype(np.float32) * 2.0 - 1.0
# ------------------------------------------------------------------
# Step 4: Apply delay
# ------------------------------------------------------------------
true_range_m = args.range_km * 1000.0
delay_chips = range_m_to_chips(true_range_m, two_way=True)
delay_samples = int(round(delay_chips)) # At chip rate, 1 sample = 1 chip
# Wrap delay within sequence length
delay_samples = delay_samples % args.chips
print()
print(f"3. Simulating range: {args.range_km:.1f} km")
print(f" Round-trip distance: {true_range_m * 2 / 1000:.1f} km")
print(f" Round-trip time: {2 * true_range_m / SPEED_OF_LIGHT_M_S * 1000:.4f} ms")
print(f" Delay: {delay_chips:.2f} chips ({delay_samples} samples)")
received = np.roll(nrz, delay_samples)
# ------------------------------------------------------------------
# Step 5: Add noise
# ------------------------------------------------------------------
if args.snr is not None:
noise_power = 1.0 / (10.0 ** (args.snr / 10.0))
noise = np.random.default_rng(42).standard_normal(len(received)).astype(np.float32)
noise *= np.sqrt(noise_power)
received = received + noise
print(f" Added AWGN noise at {args.snr:.0f} dB SNR (noise power = {noise_power:.4f})")
# ------------------------------------------------------------------
# Step 6: Correlate
# ------------------------------------------------------------------
print()
print("4. Cross-correlating...")
correlator = RangingCorrelator(
chip_rate=RANGING_CHIP_RATE_HZ,
sample_rate=RANGING_CHIP_RATE_HZ, # 1 sample per chip
two_way=True,
)
t0 = time.time()
result = correlator.correlate(received)
t_corr = time.time() - t0
measured_range_m = result["range_m"]
error_m = abs(measured_range_m - true_range_m)
# The quantization error at chip rate:
# one chip = c / (2 * chip_rate) meters (two-way)
quant_m = SPEED_OF_LIGHT_M_S / (2.0 * RANGING_CHIP_RATE_HZ)
print(f" Correlation time: {t_corr * 1000:.1f} ms")
print(f" Peak-to-average ratio: {result['peak_to_avg_ratio']:.1f}")
# ------------------------------------------------------------------
# Step 7: Results
# ------------------------------------------------------------------
print()
print("5. Range measurement results")
print("-" * 40)
print(f" True range: {args.range_km:>12.1f} km")
print(f" Measured range: {measured_range_m / 1000:>12.1f} km")
print(f" Error: {error_m:>12.1f} m")
print(f" Quantization step: {quant_m:>12.1f} m (1 chip, two-way)")
print(f" Delay (chips): {result['delay_chips']:>12.2f}")
print(f" Delay (samples): {result['delay_samples']:>12d}")
print(f" Correlation peak: {result['correlation_peak']:>12.0f}")
if error_m <= quant_m:
print()
print(" Error is within one quantization step -- measurement is correct.")
elif error_m <= quant_m * 2:
print()
print(" Error is within two quantization steps -- acceptable.")
else:
print()
print(" Error exceeds quantization limit. Try more --chips or higher --snr.")
print()
print("=" * 60)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,64 @@
id: apollo_ranging_demod
label: Apollo Ranging Demodulator
category: '[Apollo USB]'
flags: [python]
parameters:
- id: chip_rate
label: Chip Rate (Hz)
dtype: int
default: '993963'
- id: sample_rate
label: Sample Rate (Hz)
dtype: real
default: '5120000'
- id: correlation_length
label: Correlation Length (samples)
dtype: int
default: '100000'
- id: two_way
label: Two-Way Ranging
dtype: bool
default: 'True'
options: ['True', 'False']
option_labels: ['Two-way (ground-SC-ground)', 'One-way']
inputs:
- label: in
domain: stream
dtype: float
outputs:
- label: range
domain: message
templates:
imports: from apollo.ranging_demod import ranging_demod
make: >-
apollo.ranging_demod.ranging_demod(
chip_rate=${chip_rate},
sample_rate=${sample_rate},
correlation_length=${correlation_length},
two_way=${two_way})
documentation: |-
Apollo Ranging Demodulator
Correlates received signal against known PRN ranging code to measure
spacecraft range. Accumulates samples in batches, performs FFT-based
cross-correlation, and emits range measurement PDUs.
Message output (range):
delay_chips: Measured delay in chip periods
range_m: Computed range in meters
correlation_peak: Peak correlation value
peak_to_avg_ratio: Peak-to-average ratio (signal quality)
timestamp: Measurement timestamp
Parameters:
chip_rate: Expected PRN chip rate (default 993,963 Hz)
sample_rate: Input sample rate (default 5.12 MHz)
correlation_length: Samples per correlation batch (default 100,000)
two_way: If True, divide range by 2 for round-trip measurement
file_format: 1

View File

@ -0,0 +1,46 @@
id: apollo_ranging_mod
label: Apollo Ranging Modulator
category: '[Apollo USB]'
flags: [python]
parameters:
- id: chip_rate
label: Chip Rate (Hz)
dtype: int
default: '993963'
- id: sample_rate
label: Sample Rate (Hz)
dtype: real
default: '5120000'
inputs:
- label: in
domain: stream
dtype: byte
outputs:
- label: out
domain: stream
dtype: float
templates:
imports: from apollo.ranging_mod import ranging_mod
make: apollo.ranging_mod.ranging_mod(chip_rate=${chip_rate}, sample_rate=${sample_rate})
documentation: |-
Apollo Ranging Modulator
NRZ-encodes PRN ranging chips (byte 0/1) to float (+1/-1) at the
specified sample rate. Output is suitable for summing with other
subcarriers before PM carrier modulation.
At the default 5.12 MHz sample rate, each chip occupies ~5 samples.
This is the same NRZ transform as the PCM NRZ Encoder but configured
for the ranging chip rate (~994 kchip/s) instead of the PCM bit rate.
Parameters:
chip_rate: PRN chip rate (default 993,963 Hz)
sample_rate: Output sample rate (default 5.12 MHz)
file_format: 1

View File

@ -0,0 +1,30 @@
id: apollo_ranging_source
label: Apollo Ranging Source
category: '[Apollo USB]'
flags: [python]
parameters: []
outputs:
- label: out
domain: stream
dtype: byte
templates:
imports: from apollo.ranging_source import ranging_source
make: apollo.ranging_source.ranging_source()
documentation: |-
Apollo PRN Ranging Source
Generates a continuous stream of PRN ranging code chips (bytes 0 or 1).
The composite code combines CL, X, A, B, and C component sequences
using majority-vote logic: output = (NOT(X) AND maj(A,B,C)) XOR CL
Code length: 5,456,682 chips (~5.49 seconds at 993,963 chips/sec)
The code repeats cyclically.
The full code period is pre-generated at startup, so the first call
has a brief delay while the sequence is computed.
file_format: 1

456
src/apollo/ranging.py Normal file
View File

@ -0,0 +1,456 @@
"""
Apollo PRN Ranging Code Generator.
Generates the composite pseudo-random noise code used for spacecraft range
measurement. The code combines 5 component sequences (CL, X, A, B, C) using
majority-vote logic and XOR operations.
Combined code length: 5,456,682 chips (~5.49 seconds at 993,963 chips/sec)
The composite output is produced by Shirriff's algorithm: on even-numbered
output chips (ck=0), all shift registers advance and the output is computed
from the feedback bits via majority logic. On odd-numbered chips (ck=1), the
output is simply flipped (XOR with CL clock).
Combination logic (per even chip):
out = (NOT(xnew) AND maj(anew, bnew, cnew)) XOR ck
where maj = (A&B) | (A&C) | (B&C)
Component LFSR taps (from Shirriff's Teensy rangeGenerator.ino):
A: 5-bit, taps at bit positions 2 and 0 (polynomial x^5+x^2+1)
B: 6-bit, taps at bit positions 1 and 0 (polynomial x^6+x+1)
C: 7-bit, taps at bit positions 1 and 0 (polynomial x^7+x+1)
Reference: Ken Shirriff's analysis of the Apollo ranging system
http://www.righto.com/2022/04/the-digital-ranging-system-that.html
https://github.com/shirriff/apollo-ranging
"""
import numpy as np
# ---------------------------------------------------------------------------
# Ranging constants (local to this module; will migrate to constants.py)
# ---------------------------------------------------------------------------
RANGING_CHIP_RATE_HZ = 993_963
RANGING_CODE_LENGTH = 5_456_682 # 2 * 11 * 31 * 63 * 127
RANGING_CL_LENGTH = 2
RANGING_X_LENGTH = 11
RANGING_A_LENGTH = 31
RANGING_B_LENGTH = 63
RANGING_C_LENGTH = 127
RANGING_X_INIT = 22 # 0b10110
RANGING_A_INIT = 0x1F # all ones (5-bit)
RANGING_B_INIT = 0x3F # all ones (6-bit)
RANGING_C_INIT = 0x7F # all ones (7-bit)
# LFSR taps: zero-indexed bit positions for XOR feedback, per Shirriff's code.
# These produce maximal-length sequences (period = 2^n - 1).
RANGING_A_TAPS = (2, 0) # 5-bit: x^5 + x^2 + 1
RANGING_B_TAPS = (1, 0) # 6-bit: x^6 + x + 1
RANGING_C_TAPS = (1, 0) # 7-bit: x^7 + x + 1
SPEED_OF_LIGHT_M_S = 299_792_458
class RangingCodeGenerator:
"""Generate Apollo PRN ranging code sequences.
Each component code can be generated independently (useful for
sequential correlation in the receiver) or the full composite
sequence can be generated at once.
The component generators produce *feedback bit* sequences, which is
what Shirriff's combination logic uses. These feedback sequences have
the same period and balance properties as the standard LFSR output
sequences.
"""
def generate_cl(self, n_chips: int | None = None) -> np.ndarray:
"""Generate CL component: alternating 0, 1, 0, 1, ...
Args:
n_chips: Number of chips. Default: RANGING_CL_LENGTH (2).
Returns:
uint8 array of 0/1 values.
"""
if n_chips is None:
n_chips = RANGING_CL_LENGTH
return np.array([i % 2 for i in range(n_chips)], dtype=np.uint8)
def generate_x(self, n_chips: int | None = None) -> np.ndarray:
"""Generate X component feedback sequence.
The X code is an 11-chip sequence with non-LFSR feedback.
Register bits: [xg, xf, xf1, xf2, xf3] = [bit4..bit0]
Feedback: xnew = (!xg & !xf2) | (!xf & !xf3) | (!xf1 & xf2 & xf3)
The output is the feedback bit (xnew), not the shift-out bit.
Args:
n_chips: Number of chips. Default: RANGING_X_LENGTH (11).
Returns:
uint8 array of 0/1 values.
"""
if n_chips is None:
n_chips = RANGING_X_LENGTH
state = RANGING_X_INIT # 22 = 0b10110
chips = np.zeros(n_chips, dtype=np.uint8)
for i in range(n_chips):
# Extract individual bits
xg = (state >> 4) & 1 # bit 4 (MSB)
xf = (state >> 3) & 1 # bit 3
xf1 = (state >> 2) & 1 # bit 2
xf2 = (state >> 1) & 1 # bit 1
xf3 = state & 1 # bit 0
# Custom feedback per Shirriff's Teensy rangeGenerator.ino
xnew = ((xg ^ 1) & (xf2 ^ 1)) | ((xf ^ 1) & (xf3 ^ 1)) | ((xf1 ^ 1) & xf2 & xf3)
# Output is the feedback bit
chips[i] = xnew & 1
# Shift right, new bit enters at MSB (bit 4)
state = ((xnew & 1) << 4) | (state >> 1)
return chips
def _generate_lfsr_feedback(
self,
n_bits: int,
taps: tuple[int, int],
init: int,
n_chips: int,
) -> np.ndarray:
"""Generate LFSR feedback bit sequence.
Shift direction: right (MSB in, LSB out). XOR feedback from the
two tap positions. Output is the feedback bit (which becomes the
new MSB after shift), matching Shirriff's combination logic.
Args:
n_bits: Register width.
taps: Pair of zero-indexed bit positions for XOR feedback.
init: Initial register state.
n_chips: Number of output chips.
Returns:
uint8 array of 0/1 values (feedback bits).
"""
state = init
mask = (1 << n_bits) - 1
chips = np.zeros(n_chips, dtype=np.uint8)
for i in range(n_chips):
# XOR feedback from tap positions
fb = ((state >> taps[0]) & 1) ^ ((state >> taps[1]) & 1)
# Output is the feedback bit
chips[i] = fb
# Shift right, feedback enters at MSB
state = ((fb << (n_bits - 1)) | (state >> 1)) & mask
return chips
def generate_a(self, n_chips: int | None = None) -> np.ndarray:
"""Generate A component: 31-chip LFSR, 5 bits, taps [2,0]."""
if n_chips is None:
n_chips = RANGING_A_LENGTH
return self._generate_lfsr_feedback(5, RANGING_A_TAPS, RANGING_A_INIT, n_chips)
def generate_b(self, n_chips: int | None = None) -> np.ndarray:
"""Generate B component: 63-chip LFSR, 6 bits, taps [1,0]."""
if n_chips is None:
n_chips = RANGING_B_LENGTH
return self._generate_lfsr_feedback(6, RANGING_B_TAPS, RANGING_B_INIT, n_chips)
def generate_c(self, n_chips: int | None = None) -> np.ndarray:
"""Generate C component: 127-chip LFSR, 7 bits, taps [1,0]."""
if n_chips is None:
n_chips = RANGING_C_LENGTH
return self._generate_lfsr_feedback(7, RANGING_C_TAPS, RANGING_C_INIT, n_chips)
def generate_component(self, name: str, n_chips: int | None = None) -> np.ndarray:
"""Generate a named component sequence.
Args:
name: One of "cl", "x", "a", "b", "c" (case-insensitive).
n_chips: Number of chips (default: one full period).
Returns:
uint8 array of 0/1 values.
Raises:
ValueError: If name is not a recognized component.
"""
generators = {
"cl": self.generate_cl,
"x": self.generate_x,
"a": self.generate_a,
"b": self.generate_b,
"c": self.generate_c,
}
gen = generators.get(name.lower())
if gen is None:
raise ValueError(f"Unknown component: {name!r}. Valid: {list(generators)}")
return gen(n_chips)
def generate_sequence(self, n_chips: int | None = None) -> np.ndarray:
"""Generate the full composite PRN ranging code.
Reproduces Shirriff's calc() function: on even output chips (ck=0),
all shift registers advance and the output is computed from feedback
bits. On odd chips (ck=1), the output is flipped (XOR with clock).
The component feedback sequences are tiled to fill the requested
length. Each component advances once per 2 output chips (CL period),
so component index = output_chip_index // 2.
Args:
n_chips: Total chips to generate.
Default: RANGING_CODE_LENGTH (one full period).
Returns:
uint8 array of 0/1 values, length n_chips.
"""
if n_chips is None:
n_chips = RANGING_CODE_LENGTH
# Number of register advance steps (one per CL period of 2 chips)
n_steps = (n_chips + 1) // 2
# Generate feedback bit sequences for each component, one per step
x_period = self.generate_x()
a_period = self.generate_a()
b_period = self.generate_b()
c_period = self.generate_c()
def _tile(period: np.ndarray, length: int, total: int) -> np.ndarray:
reps = (total + length - 1) // length
return np.tile(period, reps)[:total]
x_fb = _tile(x_period, RANGING_X_LENGTH, n_steps)
a_fb = _tile(a_period, RANGING_A_LENGTH, n_steps)
b_fb = _tile(b_period, RANGING_B_LENGTH, n_steps)
c_fb = _tile(c_period, RANGING_C_LENGTH, n_steps)
# Majority vote on feedback bits: maj(A,B,C) = (A&B) | (A&C) | (B&C)
maj = (a_fb & b_fb) | (a_fb & c_fb) | (b_fb & c_fb)
# Composite per step: (NOT(xnew) AND maj) -- before CL XOR
base = ((x_fb ^ 1) & maj).astype(np.uint8)
# Expand to output chips: each step produces 2 chips.
# Chip 0 (ck=1 after advance): base XOR 1
# Chip 1 (ck=0 before advance): base XOR 0 = base
# Wait -- re-examine Shirriff's calc():
# On entry with ck=0: advance registers, set ck=1, compute out with XOR ck(=1)
# On entry with ck=1: set ck=0, flip out (out ^= 1)
# So the first chip of each pair uses ck=1 (XOR with 1),
# and the second uses the flip (equivalent to XOR with 0).
chip0 = base ^ 1 # ck=1 at time of computation
chip1 = base # flipped = (base ^ 1) ^ 1 = base
# Interleave: [chip0[0], chip1[0], chip0[1], chip1[1], ...]
output = np.empty(n_steps * 2, dtype=np.uint8)
output[0::2] = chip0
output[1::2] = chip1
return output[:n_chips]
class RangingCorrelator:
"""Cross-correlate received NRZ samples with the known PRN code.
Uses FFT-based cross-correlation for efficiency. Works at any sample
rate -- when sample_rate equals chip_rate, each chip is one sample.
"""
def __init__(
self,
chip_rate: int = RANGING_CHIP_RATE_HZ,
sample_rate: float = RANGING_CHIP_RATE_HZ,
two_way: bool = True,
):
self.chip_rate = chip_rate
self.sample_rate = sample_rate
self.two_way = two_way
self.samples_per_chip = sample_rate / chip_rate
self._gen = RangingCodeGenerator()
def correlate(self, received: np.ndarray, code_chips: int | None = None) -> dict:
"""Cross-correlate received samples with PRN code.
Args:
received: Float samples (NRZ-like, +1/-1 or similar).
code_chips: Number of PRN chips for reference.
Default: enough to cover the received samples.
Returns:
Dict with delay_chips, delay_samples, range_m,
correlation_peak, peak_to_avg_ratio.
"""
if code_chips is None:
code_chips = int(len(received) / self.samples_per_chip) + 1
# Generate reference code (at most one full period)
ref_chips = self._gen.generate_sequence(min(code_chips, RANGING_CODE_LENGTH))
# NRZ encode reference: 0 -> -1, 1 -> +1
ref_nrz = ref_chips.astype(np.float32) * 2.0 - 1.0
# Upsample reference to match sample rate
spc = int(self.samples_per_chip)
ref_samples = np.repeat(ref_nrz, spc) if spc > 1 else ref_nrz
# Truncate to match lengths for correlation
min_len = min(len(received), len(ref_samples))
rx = received[:min_len].astype(np.float32)
ref = ref_samples[:min_len]
# Cross-correlate using FFT
n_fft = 1
while n_fft < 2 * min_len:
n_fft *= 2
rx_fft = np.fft.rfft(rx, n_fft)
ref_fft = np.fft.rfft(ref, n_fft)
xcorr = np.fft.irfft(rx_fft * np.conj(ref_fft), n_fft)
# Find peak in the valid region
peak_idx = int(np.argmax(np.abs(xcorr[:min_len])))
peak_val = float(np.abs(xcorr[peak_idx]))
avg_val = float(np.mean(np.abs(xcorr[:min_len])))
# Convert sample delay to chip delay
delay_chips = peak_idx / self.samples_per_chip
range_m = chips_to_range_m(delay_chips, two_way=self.two_way)
return {
"delay_samples": peak_idx,
"delay_chips": delay_chips,
"range_m": range_m,
"correlation_peak": peak_val,
"peak_to_avg_ratio": peak_val / avg_val if avg_val > 0 else 0.0,
}
# ---------------------------------------------------------------------------
# Utility functions
# ---------------------------------------------------------------------------
def chips_to_range_m(delay_chips: float, two_way: bool = True) -> float:
"""Convert chip delay to range in meters.
Args:
delay_chips: Delay measured in chip periods.
two_way: If True, signal traveled ground -> spacecraft -> ground.
Returns:
Range in meters.
"""
chip_period_s = 1.0 / RANGING_CHIP_RATE_HZ
delay_s = delay_chips * chip_period_s
distance = SPEED_OF_LIGHT_M_S * delay_s
if two_way:
distance /= 2.0
return distance
def range_m_to_chips(range_m: float, two_way: bool = True) -> float:
"""Convert range in meters to chip delay.
Args:
range_m: Distance in meters.
two_way: If True, compute round-trip delay.
Returns:
Delay in chip periods.
"""
distance = range_m * 2.0 if two_way else range_m
delay_s = distance / SPEED_OF_LIGHT_M_S
return delay_s * RANGING_CHIP_RATE_HZ
def verify_code_properties() -> dict:
"""Verify all component codes have correct properties.
Checks length, periodicity, and balance for each component and the
full composite sequence.
Returns:
Dict with component names as keys and property dicts as values.
"""
gen = RangingCodeGenerator()
results = {}
components = {
"cl": {"length": RANGING_CL_LENGTH, "n_bits": None},
"x": {"length": RANGING_X_LENGTH, "n_bits": 5},
"a": {"length": RANGING_A_LENGTH, "n_bits": 5},
"b": {"length": RANGING_B_LENGTH, "n_bits": 6},
"c": {"length": RANGING_C_LENGTH, "n_bits": 7},
}
for name, props in components.items():
code = gen.generate_component(name)
exp_len = props["length"]
result = {
"length": len(code),
"length_correct": len(code) == exp_len,
"ones_count": int(np.sum(code)),
"zeros_count": int(exp_len - np.sum(code)),
}
# Check periodicity: 2x generation should repeat exactly
code_2x = gen.generate_component(name, exp_len * 2)
result["periodic"] = bool(np.array_equal(code_2x[:exp_len], code_2x[exp_len:]))
# For maximal-length LFSRs, verify balance: ones = 2^(n-1), zeros = 2^(n-1)-1
if name in ("a", "b", "c") and props["n_bits"] is not None:
n = props["n_bits"]
expected_ones = 2 ** (n - 1)
expected_zeros = 2 ** (n - 1) - 1
result["balance_correct"] = (
result["ones_count"] == expected_ones and result["zeros_count"] == expected_zeros
)
# CL should be perfectly balanced
if name == "cl":
result["balance_correct"] = result["ones_count"] == 1 and result["zeros_count"] == 1
results[name] = result
# Verify combined length is the product of all component lengths
product = (
RANGING_CL_LENGTH
* RANGING_X_LENGTH
* RANGING_A_LENGTH
* RANGING_B_LENGTH
* RANGING_C_LENGTH
)
results["length_product"] = {
"expected": product,
"matches_constant": product == RANGING_CODE_LENGTH,
}
# Full composite -- generate a shorter segment for speed
# (full 5.4M chips is fine but we verify length math above)
short_len = 10_000
short_seq = gen.generate_sequence(n_chips=short_len)
results["composite_sample"] = {
"length": len(short_seq),
"ones_count": int(np.sum(short_seq)),
"zeros_count": int(short_len - np.sum(short_seq)),
"balance": float(np.sum(short_seq)) / short_len,
}
return results

114
src/apollo/ranging_demod.py Normal file
View File

@ -0,0 +1,114 @@
"""
Apollo Ranging Demodulator -- correlates received signal to measure range.
Takes the PM-demodulated float signal (containing the ranging subcarrier),
cross-correlates with the known PRN code, and outputs range measurements
as PDU messages.
The correlator accumulates a configurable number of samples per batch,
then performs FFT-based cross-correlation against the reference PRN code.
Each correlation batch produces one range measurement PDU.
Reference: Ken Shirriff's Apollo ranging analysis
http://www.righto.com/2022/04/the-digital-ranging-system-that.html
"""
import time
import numpy as np
from apollo.ranging import (
RANGING_CHIP_RATE_HZ,
RangingCorrelator,
)
SAMPLE_RATE_BASEBAND = 5_120_000
# ---------------------------------------------------------------------------
# GNU Radio block (optional -- only if gnuradio is available)
# ---------------------------------------------------------------------------
try:
import pmt
from gnuradio import gr
class ranging_demod(gr.basic_block):
"""GNU Radio block: ranging demodulator with correlation.
Accumulates samples, runs batch correlation, emits range PDUs.
Input: float (PM demod output or filtered ranging signal)
Output: message PDUs with range measurements (port "range")
"""
def __init__(
self,
chip_rate: int = RANGING_CHIP_RATE_HZ,
sample_rate: float = SAMPLE_RATE_BASEBAND,
correlation_length: int = 100_000,
two_way: bool = True,
):
gr.basic_block.__init__(
self,
name="apollo_ranging_demod",
in_sig=[np.float32],
out_sig=None,
)
self.message_port_register_out(pmt.intern("range"))
self._correlator = RangingCorrelator(
chip_rate=chip_rate,
sample_rate=sample_rate,
two_way=two_way,
)
self._buffer = np.array([], dtype=np.float32)
self._correlation_length = correlation_length
def general_work(self, input_items, output_items):
n = len(input_items[0])
samples = np.array(input_items[0][:n], dtype=np.float32)
self.consume(0, n)
self._buffer = np.concatenate([self._buffer, samples])
while len(self._buffer) >= self._correlation_length:
chunk = self._buffer[: self._correlation_length]
self._buffer = self._buffer[self._correlation_length :]
result = self._correlator.correlate(chunk)
meta = pmt.make_dict()
meta = pmt.dict_add(
meta,
pmt.intern("delay_chips"),
pmt.from_double(result["delay_chips"]),
)
meta = pmt.dict_add(
meta,
pmt.intern("range_m"),
pmt.from_double(result["range_m"]),
)
meta = pmt.dict_add(
meta,
pmt.intern("correlation_peak"),
pmt.from_double(result["correlation_peak"]),
)
meta = pmt.dict_add(
meta,
pmt.intern("peak_to_avg_ratio"),
pmt.from_double(result["peak_to_avg_ratio"]),
)
meta = pmt.dict_add(
meta,
pmt.intern("timestamp"),
pmt.from_double(time.time()),
)
pdu = pmt.cons(meta, pmt.PMT_NIL)
self.message_port_pub(pmt.intern("range"), pdu)
return 0
except ImportError:
pass

65
src/apollo/ranging_mod.py Normal file
View File

@ -0,0 +1,65 @@
"""
Apollo Ranging Modulator -- NRZ-encodes PRN chips for carrier modulation.
Converts the chip stream (bytes 0/1) to a float NRZ waveform (+1/-1)
at the baseband sample rate. This output is suitable for summing with
other subcarriers before PM modulation.
The chip rate is ~994 kchip/s, so at 5.12 MHz sample rate there are
approximately 5.15 samples per chip.
This is essentially the same transform as nrz_encoder but configured
for the ranging chip rate instead of the PCM bit rate.
Reference: IMPLEMENTATION_SPEC.md ranging modulation path
"""
from gnuradio import blocks, gr
from apollo.ranging import RANGING_CHIP_RATE_HZ
# Use the standard baseband sample rate
SAMPLE_RATE_BASEBAND = 5_120_000
class ranging_mod(gr.hier_block2):
"""Ranging chip NRZ modulator: byte (0/1) -> float (+1/-1) at sample_rate.
Input: byte stream (chip values 0 or 1)
Output: float NRZ waveform at sample_rate
"""
def __init__(
self,
chip_rate: int = RANGING_CHIP_RATE_HZ,
sample_rate: float = SAMPLE_RATE_BASEBAND,
):
gr.hier_block2.__init__(
self,
"apollo_ranging_mod",
gr.io_signature(1, 1, gr.sizeof_char),
gr.io_signature(1, 1, gr.sizeof_float),
)
self._chip_rate = chip_rate
self._sample_rate = sample_rate
samples_per_chip = int(sample_rate / chip_rate)
# byte (0/1) -> float (0.0/1.0)
self.to_float = blocks.char_to_float(1, 1)
# float (0.0/1.0) -> float (0.0/2.0)
self.scale = blocks.multiply_const_ff(2.0)
# float (0.0/2.0) -> float (-1.0/+1.0)
self.offset = blocks.add_const_ff(-1.0)
# Upsample: repeat each value samples_per_chip times
self.upsample = blocks.repeat(gr.sizeof_float, samples_per_chip)
# Connect chain
self.connect(self, self.to_float, self.scale, self.offset, self.upsample, self)
@property
def samples_per_chip(self) -> int:
return int(self._sample_rate / self._chip_rate)

View File

@ -0,0 +1,60 @@
"""
Apollo Ranging Source -- streams PRN ranging chips.
Outputs a continuous stream of bytes (0 or 1) representing the composite
PRN ranging code. The code repeats every 5,456,682 chips (~5.49 seconds
at the nominal 993,963 chip/s rate).
The full code period is pre-generated and cycled through, so startup cost
is paid once and streaming is zero-allocation.
Reference: Ken Shirriff's Apollo ranging analysis
http://www.righto.com/2022/04/the-digital-ranging-system-that.html
"""
import numpy as np
from apollo.ranging import RANGING_CODE_LENGTH, RangingCodeGenerator
# ---------------------------------------------------------------------------
# GNU Radio block (optional -- only if gnuradio is available)
# ---------------------------------------------------------------------------
try:
from gnuradio import gr
class ranging_source(gr.sync_block):
"""GNU Radio source: continuous PRN ranging chip stream.
Outputs bytes (0 or 1) at the chip rate. Pre-generates the full
code period and cycles through it.
"""
def __init__(self):
gr.sync_block.__init__(
self,
name="apollo_ranging_source",
in_sig=None,
out_sig=[np.byte],
)
self._gen = RangingCodeGenerator()
self._code = self._gen.generate_sequence()
self._pos = 0
def work(self, input_items, output_items):
out = output_items[0]
n = len(out)
produced = 0
while produced < n:
remaining = RANGING_CODE_LENGTH - self._pos
chunk = min(n - produced, remaining)
out[produced : produced + chunk] = self._code[self._pos : self._pos + chunk]
self._pos = (self._pos + chunk) % RANGING_CODE_LENGTH
produced += chunk
return produced
except ImportError:
pass