gr-sarsat-modern/python/sarsat/pds_frame_sync.py
Ryan Malloy 5867c54de3 feat: port gr-sarsat to GNU Radio 3.10+
Ported from zleffke/gr-sarsat (2018, GNU Radio 3.7):
- Updated Python 2 → Python 3
- Converted XML block definitions to YAML
- Added pyproject.toml for modern packaging
- Preserved original MIT license

Blocks:
- biphase_l_decode_bb: Manchester decoder (decim_block)
- pds_frame_sync: Frame synchronizer with PDU output
- sarp_msg_extract: SARP message splitter/validator
2026-02-11 02:22:28 -07:00

113 lines
3.4 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# MIT License
# Copyright (c) 2018 zleffke
# Ported to GNU Radio 3.10+ by gr-mcp
"""
PDS (Polar-orbiting satellite Data Service) Frame Synchronizer.
Cospas-Sarsat 406 MHz beacons transmit:
- 15-bit frame sync: 0x7FF (all ones, Biphase-L encoded)
- 576-bit PDS frame containing 3 SARP messages
This block:
1. Waits for frame sync tag from upstream correlator
2. Collects the following 576 bits
3. Packs bits into bytes
4. Emits as PDU message
"""
import numpy as np
import pmt
import binascii
from gnuradio import gr
# State machine states
SEARCH = 1
COPY = 2
class pds_frame_sync(gr.sync_block):
"""
PDS Frame Synchronizer.
Expects "Correlate Access Code - Tag" block upstream.
Tag indicates the last bit of the Biphase-L DECODED frame sync.
Operates on unpacked byte stream (one bit per byte).
Input: Biphase-L decoded bit stream with frame sync tags
Output: PDU messages containing 72-byte PDS frames
"""
def __init__(self, tag_name="pds_sync"):
gr.sync_block.__init__(
self,
name="pds_frame_sync",
in_sig=[np.int8],
out_sig=None,
)
self.tag_name = tag_name
self.message_port_register_out(pmt.intern("out"))
self.len_encoded_msg = 576 # Bits in PDS frame (72 bytes)
self.pds_msg = []
self.msg_packed = []
self.msg_count = 0
self.state = SEARCH
def pack_bytes(self):
"""Pack unpacked bit list into bytearray."""
self.msg_count += 1
a = [
int("".join(map(str, self.pds_msg[i : i + 8])), 2)
for i in range(0, len(self.pds_msg), 8)
]
self.msg_packed = bytearray(a)
def work(self, input_items, output_items):
in0 = input_items[0]
num_input_items = len(in0)
return_value = num_input_items
nread = self.nitems_read(0)
if self.state == SEARCH:
tags = self.get_tags_in_window(0, 0, num_input_items)
if len(tags) > 0:
for t in tags:
t_str = pmt.symbol_to_string(t.key)
if t_str == self.tag_name:
# Frame sync detected - start collecting
del self.pds_msg[:]
del self.msg_packed[:]
cur_idx = t.offset - nread
self.pds_msg.extend(in0[cur_idx:])
self.state = COPY
elif self.state == COPY:
cur_msg_len = len(self.pds_msg)
if (cur_msg_len + num_input_items) < self.len_encoded_msg:
# Still collecting bits
self.pds_msg.extend(in0)
else:
# Frame complete
num_remain = self.len_encoded_msg - cur_msg_len
self.pds_msg.extend(in0[0:num_remain])
return_value = num_remain
self.pack_bytes()
msg_str = f"[{self.msg_count:d}] {binascii.hexlify(self.msg_packed).decode()}"
print(msg_str)
# Emit PDU with packed frame
self.message_port_pub(
pmt.intern("out"),
pmt.cons(
pmt.PMT_NIL,
pmt.init_u8vector(len(self.msg_packed), self.msg_packed),
),
)
self.state = SEARCH
return return_value