Ported from zleffke/gr-sarsat (2018, GNU Radio 3.7): - Updated Python 2 → Python 3 - Converted XML block definitions to YAML - Added pyproject.toml for modern packaging - Preserved original MIT license Blocks: - biphase_l_decode_bb: Manchester decoder (decim_block) - pds_frame_sync: Frame synchronizer with PDU output - sarp_msg_extract: SARP message splitter/validator
113 lines
3.4 KiB
Python
113 lines
3.4 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
# MIT License
|
|
# Copyright (c) 2018 zleffke
|
|
# Ported to GNU Radio 3.10+ by gr-mcp
|
|
|
|
"""
|
|
PDS (Polar-orbiting satellite Data Service) Frame Synchronizer.
|
|
|
|
Cospas-Sarsat 406 MHz beacons transmit:
|
|
- 15-bit frame sync: 0x7FF (all ones, Biphase-L encoded)
|
|
- 576-bit PDS frame containing 3 SARP messages
|
|
|
|
This block:
|
|
1. Waits for frame sync tag from upstream correlator
|
|
2. Collects the following 576 bits
|
|
3. Packs bits into bytes
|
|
4. Emits as PDU message
|
|
"""
|
|
|
|
import numpy as np
|
|
import pmt
|
|
import binascii
|
|
from gnuradio import gr
|
|
|
|
# State machine states
|
|
SEARCH = 1
|
|
COPY = 2
|
|
|
|
|
|
class pds_frame_sync(gr.sync_block):
|
|
"""
|
|
PDS Frame Synchronizer.
|
|
|
|
Expects "Correlate Access Code - Tag" block upstream.
|
|
Tag indicates the last bit of the Biphase-L DECODED frame sync.
|
|
Operates on unpacked byte stream (one bit per byte).
|
|
|
|
Input: Biphase-L decoded bit stream with frame sync tags
|
|
Output: PDU messages containing 72-byte PDS frames
|
|
"""
|
|
|
|
def __init__(self, tag_name="pds_sync"):
|
|
gr.sync_block.__init__(
|
|
self,
|
|
name="pds_frame_sync",
|
|
in_sig=[np.int8],
|
|
out_sig=None,
|
|
)
|
|
|
|
self.tag_name = tag_name
|
|
self.message_port_register_out(pmt.intern("out"))
|
|
self.len_encoded_msg = 576 # Bits in PDS frame (72 bytes)
|
|
self.pds_msg = []
|
|
self.msg_packed = []
|
|
self.msg_count = 0
|
|
self.state = SEARCH
|
|
|
|
def pack_bytes(self):
|
|
"""Pack unpacked bit list into bytearray."""
|
|
self.msg_count += 1
|
|
a = [
|
|
int("".join(map(str, self.pds_msg[i : i + 8])), 2)
|
|
for i in range(0, len(self.pds_msg), 8)
|
|
]
|
|
self.msg_packed = bytearray(a)
|
|
|
|
def work(self, input_items, output_items):
|
|
in0 = input_items[0]
|
|
num_input_items = len(in0)
|
|
return_value = num_input_items
|
|
nread = self.nitems_read(0)
|
|
|
|
if self.state == SEARCH:
|
|
tags = self.get_tags_in_window(0, 0, num_input_items)
|
|
if len(tags) > 0:
|
|
for t in tags:
|
|
t_str = pmt.symbol_to_string(t.key)
|
|
if t_str == self.tag_name:
|
|
# Frame sync detected - start collecting
|
|
del self.pds_msg[:]
|
|
del self.msg_packed[:]
|
|
cur_idx = t.offset - nread
|
|
self.pds_msg.extend(in0[cur_idx:])
|
|
self.state = COPY
|
|
|
|
elif self.state == COPY:
|
|
cur_msg_len = len(self.pds_msg)
|
|
if (cur_msg_len + num_input_items) < self.len_encoded_msg:
|
|
# Still collecting bits
|
|
self.pds_msg.extend(in0)
|
|
else:
|
|
# Frame complete
|
|
num_remain = self.len_encoded_msg - cur_msg_len
|
|
self.pds_msg.extend(in0[0:num_remain])
|
|
return_value = num_remain
|
|
self.pack_bytes()
|
|
|
|
msg_str = f"[{self.msg_count:d}] {binascii.hexlify(self.msg_packed).decode()}"
|
|
print(msg_str)
|
|
|
|
# Emit PDU with packed frame
|
|
self.message_port_pub(
|
|
pmt.intern("out"),
|
|
pmt.cons(
|
|
pmt.PMT_NIL,
|
|
pmt.init_u8vector(len(self.msg_packed), self.msg_packed),
|
|
),
|
|
)
|
|
self.state = SEARCH
|
|
|
|
return return_value
|