Expand from 5 to 8 mode screens. F6 Device provides firmware management, EEPROM flash with full safety state machine (C2 validation, auto-backup, 3s countdown, page write, byte verify), and diagnostics (boot test, I2C scan, register dump). F7 Stream does live TS capture with PID distribution and PAT/PMT tree. F8 Config manages LNB power, DiSEqC switching, and modulation/FEC. Foundation: 12 new SkyWalker1 methods (device info, FX2 RAM, EEPROM I2C, diagnostics), matching DemoDevice synthetics with realistic C2 image and TS packets, 20 bridge wrappers (RLock).
683 lines
23 KiB
Python
683 lines
23 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Genpix SkyWalker-1 shared library.
|
|
|
|
Provides the SkyWalker1 USB interface class, constants, and signal
|
|
processing utilities used by skywalker.py and tune.py.
|
|
"""
|
|
|
|
import sys
|
|
import struct
|
|
import time
|
|
import math
|
|
|
|
try:
|
|
import usb.core
|
|
import usb.util
|
|
except ImportError:
|
|
print("pyusb required: pip install pyusb")
|
|
sys.exit(1)
|
|
|
|
|
|
# --- USB identifiers ---
|
|
|
|
VENDOR_ID = 0x09C0
|
|
PRODUCT_ID = 0x0203
|
|
EP2_ADDR = 0x82
|
|
EP2_URB_SIZE = 8192
|
|
|
|
# --- Vendor commands ---
|
|
|
|
CMD_GET_8PSK_CONFIG = 0x80
|
|
CMD_I2C_WRITE = 0x83
|
|
CMD_I2C_READ = 0x84
|
|
CMD_ARM_TRANSFER = 0x85
|
|
CMD_TUNE_8PSK = 0x86
|
|
CMD_GET_SIGNAL_STRENGTH = 0x87
|
|
CMD_LOAD_BCM4500 = 0x88
|
|
CMD_BOOT_8PSK = 0x89
|
|
CMD_START_INTERSIL = 0x8A
|
|
CMD_SET_LNB_VOLTAGE = 0x8B
|
|
CMD_SET_22KHZ_TONE = 0x8C
|
|
CMD_SEND_DISEQC = 0x8D
|
|
CMD_GET_SIGNAL_LOCK = 0x90
|
|
CMD_GET_FW_VERS = 0x92
|
|
CMD_GET_SERIAL_NUMBER = 0x93
|
|
CMD_USE_EXTRA_VOLT = 0x94
|
|
|
|
# Custom commands (v3.01+)
|
|
CMD_SPECTRUM_SWEEP = 0xB0
|
|
CMD_RAW_DEMOD_READ = 0xB1
|
|
CMD_RAW_DEMOD_WRITE = 0xB2
|
|
CMD_BLIND_SCAN = 0xB3
|
|
CMD_I2C_BUS_SCAN = 0xB4
|
|
CMD_I2C_RAW_READ = 0xB5
|
|
CMD_I2C_DIAG = 0xB6
|
|
|
|
# Custom commands (v3.02+)
|
|
CMD_SIGNAL_MONITOR = 0xB7
|
|
CMD_TUNE_MONITOR = 0xB8
|
|
CMD_MULTI_REG_READ = 0xB9
|
|
|
|
# --- Config status bits ---
|
|
|
|
CONFIG_BITS = {
|
|
0x01: ("8PSK Started", "bm8pskStarted"),
|
|
0x02: ("BCM4500 FW Loaded", "bm8pskFW_Loaded"),
|
|
0x04: ("LNB Power On", "bmIntersilOn"),
|
|
0x08: ("DVB Mode", "bmDVBmode"),
|
|
0x10: ("22 kHz Tone", "bm22kHz"),
|
|
0x20: ("18V Selected", "bmSEL18V"),
|
|
0x40: ("DC Tuned", "bmDCtuned"),
|
|
0x80: ("Armed (streaming)", "bmArmed"),
|
|
}
|
|
|
|
# --- Modulation and FEC tables ---
|
|
|
|
MODULATIONS = {
|
|
"qpsk": (0, "DVB-S QPSK"),
|
|
"turbo-qpsk": (1, "Turbo QPSK"),
|
|
"turbo-8psk": (2, "Turbo 8PSK"),
|
|
"turbo-16qam": (3, "Turbo 16QAM"),
|
|
"dcii-combo": (4, "DCII Combo"),
|
|
"dcii-i": (5, "DCII I-stream"),
|
|
"dcii-q": (6, "DCII Q-stream"),
|
|
"dcii-oqpsk": (7, "DCII Offset QPSK"),
|
|
"dss": (8, "DSS QPSK"),
|
|
"bpsk": (9, "DVB BPSK"),
|
|
}
|
|
|
|
FEC_RATES = {
|
|
"dvbs": {
|
|
"1/2": 0, "2/3": 1, "3/4": 2, "5/6": 3,
|
|
"7/8": 4, "auto": 5, "none": 6,
|
|
},
|
|
"turbo": {
|
|
"1/2": 0, "2/3": 1, "3/4": 2, "5/6": 3, "auto": 4,
|
|
},
|
|
"turbo-16qam": {
|
|
"3/4": 0, "auto": 0,
|
|
},
|
|
"dcii": {
|
|
"1/2": 0, "2/3": 1, "6/7": 2, "3/4": 3, "5/11": 4,
|
|
"1/2+": 5, "2/3+": 6, "6/7+": 7, "3/4+": 8, "auto": 0,
|
|
},
|
|
}
|
|
|
|
MOD_FEC_GROUP = {
|
|
"qpsk": "dvbs",
|
|
"turbo-qpsk": "turbo",
|
|
"turbo-8psk": "turbo",
|
|
"turbo-16qam": "turbo-16qam",
|
|
"dcii-combo": "dcii",
|
|
"dcii-i": "dcii",
|
|
"dcii-q": "dcii",
|
|
"dcii-oqpsk": "dcii",
|
|
"dss": "dvbs",
|
|
"bpsk": "dvbs",
|
|
}
|
|
|
|
# --- LNB defaults ---
|
|
|
|
LNB_LO_LOW = 9750 # Universal LNB low-band (MHz)
|
|
LNB_LO_HIGH = 10600 # Universal LNB high-band (MHz)
|
|
|
|
# --- L-band allocations (for annotation) ---
|
|
|
|
LBAND_ALLOCATIONS = [
|
|
(1240, 1300, "Amateur 23cm"),
|
|
(1525, 1559, "Inmarsat downlink"),
|
|
(1559, 1610, "GNSS (GPS L1, Galileo E1)"),
|
|
(1610, 1626, "Iridium downlink"),
|
|
(1670, 1710, "MetSat (GOES LRIT, NOAA HRPT)"),
|
|
(1710, 1785, "LTE/AWS uplink"),
|
|
(1920, 2025, "UMTS uplink"),
|
|
]
|
|
|
|
|
|
# --- Signal processing helpers ---
|
|
|
|
def snr_raw_to_db(snr_raw: int) -> float:
|
|
"""Convert BCM4500 SNR register to dB. Register is dBu * 256."""
|
|
return snr_raw / 256.0
|
|
|
|
|
|
def snr_raw_to_pct(snr_raw: int) -> float:
|
|
"""Convert raw SNR to percentage (0-100 scale, clamped)."""
|
|
scaled = min(snr_raw * 17, 65535)
|
|
return (scaled / 65535) * 100
|
|
|
|
|
|
def agc_to_power_db(agc1: int, agc2: int) -> float:
|
|
"""
|
|
Estimate received power from AGC register values.
|
|
|
|
The AGC loop adjusts gain to keep the signal level constant at the
|
|
ADC input. Higher AGC = weaker signal (more gain needed). This is
|
|
an approximation; the exact mapping depends on the BCM3440 tuner's
|
|
gain curve.
|
|
|
|
Returns a relative dB value (higher = stronger signal).
|
|
"""
|
|
# AGC1 is the primary gain control, AGC2 is fine adjustment.
|
|
# Invert: low AGC value = high signal = high power.
|
|
# Scale to approximate dB with ~40 dB dynamic range.
|
|
combined = agc1 + (agc2 >> 4)
|
|
if combined == 0:
|
|
return 0.0
|
|
# Rough linear-to-dB: 65535 AGC ≈ -40 dB, 0 AGC ≈ 0 dB
|
|
return -40.0 * (combined / 65535.0)
|
|
|
|
|
|
def detect_peaks(freqs: list, powers: list, threshold_db: float = 3.0) -> list:
|
|
"""
|
|
Find peaks in a spectrum sweep.
|
|
|
|
Returns list of (freq_mhz, power_db, index) tuples for each local
|
|
maximum that exceeds the noise floor by threshold_db.
|
|
"""
|
|
if len(powers) < 3:
|
|
return []
|
|
|
|
# Estimate noise floor as the 25th percentile
|
|
sorted_p = sorted(powers)
|
|
noise_floor = sorted_p[len(sorted_p) // 4]
|
|
|
|
peaks = []
|
|
for i in range(1, len(powers) - 1):
|
|
if powers[i] > powers[i - 1] and powers[i] > powers[i + 1]:
|
|
if powers[i] - noise_floor >= threshold_db:
|
|
peaks.append((freqs[i], powers[i], i))
|
|
|
|
return peaks
|
|
|
|
|
|
def if_to_rf(if_mhz: float, lnb_lo: float) -> float:
|
|
"""Convert IF frequency to actual RF frequency given LNB LO."""
|
|
return if_mhz + lnb_lo
|
|
|
|
|
|
def rf_to_if(rf_mhz: float, lnb_lo: float) -> float:
|
|
"""Convert actual RF frequency to IF frequency given LNB LO."""
|
|
return rf_mhz - lnb_lo
|
|
|
|
|
|
def signal_bar(pct: float, width: int = 40) -> str:
|
|
"""Render an ASCII signal strength bar."""
|
|
filled = int(pct / 100 * width)
|
|
filled = max(0, min(filled, width))
|
|
bar = '#' * filled + '-' * (width - filled)
|
|
return f"[{bar}] {pct:.1f}%"
|
|
|
|
|
|
def format_config_bits(status: int) -> list:
|
|
"""Return list of (bit_name, is_set) tuples for config byte."""
|
|
result = []
|
|
for bit, (name, _field) in CONFIG_BITS.items():
|
|
result.append((name, bool(status & bit)))
|
|
return result
|
|
|
|
|
|
# --- SkyWalker1 USB interface ---
|
|
|
|
class SkyWalker1:
|
|
"""USB interface to the Genpix SkyWalker-1 DVB-S receiver."""
|
|
|
|
def __init__(self, verbose: bool = False):
|
|
self.dev = None
|
|
self.detached_intf = None
|
|
self.verbose = verbose
|
|
|
|
def open(self) -> None:
|
|
"""Find and claim the SkyWalker-1 USB device."""
|
|
self.dev = usb.core.find(idVendor=VENDOR_ID, idProduct=PRODUCT_ID)
|
|
if self.dev is None:
|
|
print("SkyWalker-1 not found (VID 0x09C0, PID 0x0203). Is it plugged in?")
|
|
sys.exit(1)
|
|
|
|
for cfg in self.dev:
|
|
for intf in cfg:
|
|
if self.dev.is_kernel_driver_active(intf.bInterfaceNumber):
|
|
try:
|
|
self.dev.detach_kernel_driver(intf.bInterfaceNumber)
|
|
self.detached_intf = intf.bInterfaceNumber
|
|
if self.verbose:
|
|
print(f" Detached kernel driver from interface {intf.bInterfaceNumber}")
|
|
except usb.core.USBError as e:
|
|
print(f"Cannot detach kernel driver: {e}")
|
|
print("The gp8psk module must be unbound first. Try one of:")
|
|
print(" sudo modprobe -r dvb_usb_gp8psk")
|
|
print(" echo '<bus-path>' | sudo tee /sys/bus/usb/drivers/gp8psk/unbind")
|
|
sys.exit(1)
|
|
|
|
try:
|
|
self.dev.set_configuration()
|
|
except usb.core.USBError:
|
|
pass
|
|
|
|
def close(self) -> None:
|
|
"""Release device and re-attach kernel driver."""
|
|
if self.dev is None:
|
|
return
|
|
if self.detached_intf is not None:
|
|
try:
|
|
usb.util.release_interface(self.dev, self.detached_intf)
|
|
self.dev.attach_kernel_driver(self.detached_intf)
|
|
if self.verbose:
|
|
print("Re-attached kernel driver")
|
|
except usb.core.USBError:
|
|
print("Note: run 'sudo modprobe dvb_usb_gp8psk' to reload driver")
|
|
|
|
def __enter__(self):
|
|
self.open()
|
|
return self
|
|
|
|
def __exit__(self, *exc):
|
|
self.close()
|
|
|
|
# -- Low-level USB transfers --
|
|
|
|
def _vendor_in(self, request: int, value: int = 0, index: int = 0,
|
|
length: int = 64, retries: int = 3) -> bytes:
|
|
"""Vendor IN control transfer (device-to-host), with retry."""
|
|
for attempt in range(retries):
|
|
try:
|
|
data = self.dev.ctrl_transfer(
|
|
usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_IN,
|
|
request, value, index, length, 2000
|
|
)
|
|
if self.verbose:
|
|
raw = bytes(data).hex(' ')
|
|
print(f" USB IN req=0x{request:02X} val=0x{value:04X} "
|
|
f"idx=0x{index:04X} -> [{len(data)}] {raw}")
|
|
if len(data) == length:
|
|
return bytes(data)
|
|
if self.verbose:
|
|
print(f" Partial read ({len(data)}/{length}), retry {attempt + 1}")
|
|
continue
|
|
except usb.core.USBError as e:
|
|
if self.verbose:
|
|
print(f" USB IN req=0x{request:02X} FAILED: {e}")
|
|
if attempt == retries - 1:
|
|
raise
|
|
return bytes(data)
|
|
|
|
def _vendor_out(self, request: int, value: int = 0, index: int = 0,
|
|
data: bytes = b'') -> int:
|
|
"""Vendor OUT control transfer (host-to-device)."""
|
|
if self.verbose:
|
|
raw = data.hex(' ') if data else "(no data)"
|
|
print(f" USB OUT req=0x{request:02X} val=0x{value:04X} "
|
|
f"idx=0x{index:04X} data=[{len(data)}] {raw}")
|
|
return self.dev.ctrl_transfer(
|
|
usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_OUT,
|
|
request, value, index, data, 2000
|
|
)
|
|
|
|
# -- Device info --
|
|
|
|
def get_config(self) -> int:
|
|
"""Read 8PSK config status byte."""
|
|
data = self._vendor_in(CMD_GET_8PSK_CONFIG, length=1)
|
|
return data[0]
|
|
|
|
def get_fw_version(self) -> dict:
|
|
"""Read firmware version. Returns dict with version string and date."""
|
|
data = self._vendor_in(CMD_GET_FW_VERS, length=6)
|
|
return {
|
|
"major": data[2],
|
|
"minor": data[1],
|
|
"patch": data[0],
|
|
"version": f"{data[2]}.{data[1]:02d}.{data[0]}",
|
|
"date": f"20{data[5]:02d}-{data[4]:02d}-{data[3]:02d}",
|
|
}
|
|
|
|
def get_signal_lock(self) -> bool:
|
|
"""Read signal lock status."""
|
|
data = self._vendor_in(CMD_GET_SIGNAL_LOCK, length=1)
|
|
return data[0] != 0
|
|
|
|
def get_signal_strength(self) -> dict:
|
|
"""Read signal strength. Returns SNR info dict."""
|
|
data = self._vendor_in(CMD_GET_SIGNAL_STRENGTH, length=6)
|
|
snr_raw = struct.unpack_from('<H', data, 0)[0]
|
|
snr_db = snr_raw_to_db(snr_raw)
|
|
snr_pct = snr_raw_to_pct(snr_raw)
|
|
return {
|
|
"snr_raw": snr_raw,
|
|
"snr_db": snr_db,
|
|
"snr_pct": snr_pct,
|
|
"raw_bytes": bytes(data).hex(' '),
|
|
}
|
|
|
|
# -- Power and boot --
|
|
|
|
def boot(self, on: bool = True) -> int:
|
|
"""Power on/off the 8PSK demodulator."""
|
|
data = self._vendor_in(CMD_BOOT_8PSK, value=int(on), length=1)
|
|
return data[0]
|
|
|
|
def start_intersil(self, on: bool = True) -> int:
|
|
"""Enable/disable LNB power supply."""
|
|
data = self._vendor_in(CMD_START_INTERSIL, value=int(on), length=1)
|
|
return data[0]
|
|
|
|
def set_lnb_voltage(self, high: bool) -> None:
|
|
"""Set LNB voltage: high=True for 18V, False for 13V."""
|
|
self._vendor_out(CMD_SET_LNB_VOLTAGE, value=int(high))
|
|
|
|
def set_22khz_tone(self, on: bool) -> None:
|
|
"""Enable/disable 22 kHz tone."""
|
|
self._vendor_out(CMD_SET_22KHZ_TONE, value=int(on))
|
|
|
|
def set_extra_voltage(self, on: bool) -> None:
|
|
"""Enable +1V LNB boost: 13->14V, 18->19V."""
|
|
self._vendor_out(CMD_USE_EXTRA_VOLT, value=int(on))
|
|
|
|
# -- Tuning --
|
|
|
|
def tune(self, symbol_rate_sps: int, freq_khz: int,
|
|
mod_index: int, fec_index: int) -> None:
|
|
"""Send TUNE_8PSK with 10-byte payload."""
|
|
payload = struct.pack('<II', symbol_rate_sps, freq_khz)
|
|
payload += bytes([mod_index, fec_index])
|
|
self._vendor_out(CMD_TUNE_8PSK, data=payload)
|
|
|
|
# -- Streaming --
|
|
|
|
def arm_transfer(self, on: bool) -> None:
|
|
"""Start/stop MPEG-2 transport stream."""
|
|
self._vendor_out(CMD_ARM_TRANSFER, value=int(on))
|
|
|
|
def read_stream(self, size: int = EP2_URB_SIZE,
|
|
timeout: int = 1000) -> bytes:
|
|
"""Read a chunk from the TS bulk endpoint."""
|
|
try:
|
|
data = self.dev.read(EP2_ADDR, size, timeout)
|
|
return bytes(data)
|
|
except usb.core.USBTimeoutError:
|
|
return b''
|
|
except usb.core.USBError as e:
|
|
if self.verbose:
|
|
print(f" EP2 read error: {e}")
|
|
return b''
|
|
|
|
# -- DiSEqC --
|
|
|
|
def send_diseqc_tone_burst(self, mini_cmd: int) -> None:
|
|
"""Send tone burst (mini-DiSEqC). 0=SEC_MINI_A, 1=SEC_MINI_B."""
|
|
self._vendor_out(CMD_SEND_DISEQC, value=mini_cmd)
|
|
|
|
def send_diseqc_message(self, msg: bytes) -> None:
|
|
"""Send full DiSEqC message (3-6 bytes)."""
|
|
if len(msg) < 3 or len(msg) > 6:
|
|
raise ValueError(f"DiSEqC message must be 3-6 bytes, got {len(msg)}")
|
|
self._vendor_out(CMD_SEND_DISEQC, value=msg[0], data=msg)
|
|
|
|
# -- New commands (v3.02+) --
|
|
|
|
def signal_monitor(self) -> dict:
|
|
"""
|
|
Fast combined signal read (0xB7). Returns 8 bytes in one transfer:
|
|
SNR(2) + AGC1(2) + AGC2(2) + lock(1) + status(1).
|
|
"""
|
|
data = self._vendor_in(CMD_SIGNAL_MONITOR, length=8)
|
|
snr_raw = struct.unpack_from('<H', data, 0)[0]
|
|
agc1 = struct.unpack_from('<H', data, 2)[0]
|
|
agc2 = struct.unpack_from('<H', data, 4)[0]
|
|
return {
|
|
"snr_raw": snr_raw,
|
|
"snr_db": snr_raw_to_db(snr_raw),
|
|
"snr_pct": snr_raw_to_pct(snr_raw),
|
|
"agc1": agc1,
|
|
"agc2": agc2,
|
|
"power_db": agc_to_power_db(agc1, agc2),
|
|
"lock": data[6],
|
|
"locked": bool(data[6] & 0x20),
|
|
"status": data[7],
|
|
}
|
|
|
|
def tune_monitor(self, symbol_rate_sps: int, freq_khz: int,
|
|
mod_index: int, fec_index: int,
|
|
dwell_ms: int = 10) -> dict:
|
|
"""
|
|
Tune + dwell + signal read in one round-trip (0xB8).
|
|
|
|
Sends tune parameters via OUT phase, firmware tunes + waits
|
|
dwell_ms + reads signal. Then IN phase returns the result.
|
|
"""
|
|
dwell_ms = max(1, min(255, dwell_ms))
|
|
payload = struct.pack('<II', symbol_rate_sps, freq_khz)
|
|
payload += bytes([mod_index, fec_index])
|
|
# OUT phase: send tune data, firmware does tune + dwell + read
|
|
self._vendor_out(CMD_TUNE_MONITOR, value=dwell_ms, data=payload)
|
|
# IN phase: read stored result
|
|
data = self._vendor_in(CMD_TUNE_MONITOR, length=10)
|
|
snr_raw = struct.unpack_from('<H', data, 0)[0]
|
|
agc1 = struct.unpack_from('<H', data, 2)[0]
|
|
agc2 = struct.unpack_from('<H', data, 4)[0]
|
|
return {
|
|
"snr_raw": snr_raw,
|
|
"snr_db": snr_raw_to_db(snr_raw),
|
|
"agc1": agc1,
|
|
"agc2": agc2,
|
|
"power_db": agc_to_power_db(agc1, agc2),
|
|
"lock": data[6],
|
|
"locked": bool(data[6] & 0x20),
|
|
"status": data[7],
|
|
"dwell_ms": struct.unpack_from('<H', data, 8)[0],
|
|
}
|
|
|
|
def multi_reg_read(self, start_reg: int, count: int) -> bytes:
|
|
"""
|
|
Batch read of contiguous BCM4500 indirect registers (0xB9).
|
|
|
|
Returns count bytes, one per register. Up to 64 registers
|
|
in a single USB transfer (vs. individual 0xB1 reads).
|
|
"""
|
|
count = max(1, min(64, count))
|
|
data = self._vendor_in(CMD_MULTI_REG_READ, value=start_reg,
|
|
index=count, length=count)
|
|
return bytes(data)
|
|
|
|
# -- Device info (extended) --
|
|
|
|
def get_serial_number(self) -> bytes:
|
|
"""Read 8-byte serial number from device."""
|
|
return self._vendor_in(CMD_GET_SERIAL_NUMBER, length=8)
|
|
|
|
def get_usb_speed(self) -> int:
|
|
"""Read USB connection speed. 0=unknown, 1=Full, 2=High."""
|
|
data = self._vendor_in(0x07, length=1)
|
|
return data[0]
|
|
|
|
def get_vendor_string(self) -> str:
|
|
"""Read vendor string descriptor from FX2."""
|
|
data = self._vendor_in(0x0C, length=64)
|
|
return bytes(data).rstrip(b'\x00').decode('ascii', errors='replace')
|
|
|
|
def get_product_string(self) -> str:
|
|
"""Read product string descriptor from FX2."""
|
|
data = self._vendor_in(0x0D, length=64)
|
|
return bytes(data).rstrip(b'\x00').decode('ascii', errors='replace')
|
|
|
|
# -- FX2 RAM access (standard Cypress A0 vendor request) --
|
|
|
|
def fx2_ram_read(self, addr: int, length: int) -> bytes:
|
|
"""Read FX2 internal RAM via A0 vendor request. Non-destructive."""
|
|
length = max(1, min(64, length))
|
|
data = self.dev.ctrl_transfer(
|
|
usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_IN,
|
|
0xA0, addr, 0, length, 2000
|
|
)
|
|
if self.verbose:
|
|
raw = bytes(data).hex(' ')
|
|
print(f" RAM IN addr=0x{addr:04X} len={length} -> {raw}")
|
|
return bytes(data)
|
|
|
|
def fx2_ram_write(self, addr: int, data: bytes) -> int:
|
|
"""Write FX2 internal RAM via A0 vendor request. Reverts on power cycle."""
|
|
if self.verbose:
|
|
raw = data.hex(' ')
|
|
print(f" RAM OUT addr=0x{addr:04X} len={len(data)} data={raw}")
|
|
return self.dev.ctrl_transfer(
|
|
usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_OUT,
|
|
0xA0, addr, 0, data, 2000
|
|
)
|
|
|
|
def fx2_cpu_halt(self) -> None:
|
|
"""Halt FX2 CPU by writing 1 to CPUCS register (0xE600)."""
|
|
self.fx2_ram_write(0xE600, b'\x01')
|
|
|
|
def fx2_cpu_start(self) -> None:
|
|
"""Release FX2 CPU by writing 0 to CPUCS register (0xE600)."""
|
|
self.fx2_ram_write(0xE600, b'\x00')
|
|
|
|
# -- EEPROM access (via I2C proxy commands) --
|
|
|
|
EEPROM_SLAVE = 0x51
|
|
EEPROM_PAGE_SIZE = 16
|
|
EEPROM_WRITE_CYCLE_MS = 10
|
|
|
|
def eeprom_read(self, offset: int, length: int = 64) -> bytes:
|
|
"""Read from boot EEPROM at given offset via I2C."""
|
|
return self._vendor_in(CMD_I2C_READ, value=self.EEPROM_SLAVE,
|
|
index=offset, length=length)
|
|
|
|
def eeprom_write_page(self, offset: int, data: bytes) -> int:
|
|
"""Write a page (up to 16 bytes) to EEPROM. Caller handles alignment."""
|
|
return self._vendor_out(CMD_I2C_WRITE, value=self.EEPROM_SLAVE,
|
|
index=offset, data=data)
|
|
|
|
def eeprom_read_all(self, size: int = 16384) -> bytes:
|
|
"""Read entire EEPROM contents up to size bytes."""
|
|
chunk_size = 64
|
|
result = bytearray()
|
|
for offset in range(0, size, chunk_size):
|
|
remaining = min(chunk_size, size - offset)
|
|
chunk = self.eeprom_read(offset, remaining)
|
|
result.extend(chunk)
|
|
return bytes(result)
|
|
|
|
# -- Diagnostics --
|
|
|
|
def boot_debug(self, mode: int) -> dict:
|
|
"""
|
|
Run boot diagnostic with specified mode byte.
|
|
|
|
Modes: 0x80=no-op, 0x81=GPIO init, 0x82=I2C probe,
|
|
0x83=BCM4500 reset, 0x84=FW load, 0x85=full boot.
|
|
Returns 3-byte status: {stage, result, detail}.
|
|
"""
|
|
data = self._vendor_in(CMD_BOOT_8PSK, value=mode, length=3)
|
|
return {
|
|
"stage": data[0],
|
|
"result": data[1],
|
|
"detail": data[2],
|
|
}
|
|
|
|
def i2c_bus_scan(self) -> list[int]:
|
|
"""
|
|
Scan I2C bus for responding devices.
|
|
|
|
Returns list of 7-bit slave addresses that ACK'd.
|
|
The firmware returns a 16-byte bitmap (128 bits for addresses 0-127).
|
|
"""
|
|
data = self._vendor_in(CMD_I2C_BUS_SCAN, length=16)
|
|
addresses = []
|
|
for byte_idx in range(16):
|
|
for bit_idx in range(8):
|
|
if data[byte_idx] & (1 << bit_idx):
|
|
addresses.append(byte_idx * 8 + bit_idx)
|
|
return addresses
|
|
|
|
def i2c_raw_read(self, slave: int, reg: int) -> int:
|
|
"""Read a single register from an I2C device."""
|
|
data = self._vendor_in(CMD_I2C_RAW_READ, value=slave,
|
|
index=reg, length=1)
|
|
return data[0]
|
|
|
|
# -- High-level sweep helpers --
|
|
|
|
def sweep_spectrum(self, start_mhz: float, stop_mhz: float,
|
|
step_mhz: float = 5.0, dwell_ms: int = 10,
|
|
sr_ksps: int = 20000, mod_index: int = 0,
|
|
fec_index: int = 5,
|
|
callback=None) -> tuple:
|
|
"""
|
|
Sweep a frequency range and return power measurements.
|
|
|
|
Uses TUNE_MONITOR (0xB8) at each step for efficient measurement.
|
|
Default tune params: QPSK, auto-FEC, 20 Msps.
|
|
|
|
callback(freq_mhz, step_num, total_steps, result) is called
|
|
per step if provided.
|
|
|
|
Returns (freqs_mhz[], powers_db[], raw_results[]).
|
|
"""
|
|
sr_sps = sr_ksps * 1000
|
|
freqs = []
|
|
powers = []
|
|
results = []
|
|
|
|
freq = start_mhz
|
|
steps = int((stop_mhz - start_mhz) / step_mhz) + 1
|
|
step_num = 0
|
|
|
|
while freq <= stop_mhz:
|
|
freq_khz = int(freq * 1000)
|
|
result = self.tune_monitor(sr_sps, freq_khz, mod_index,
|
|
fec_index, dwell_ms)
|
|
freqs.append(freq)
|
|
powers.append(result["power_db"])
|
|
results.append(result)
|
|
|
|
if callback:
|
|
callback(freq, step_num, steps, result)
|
|
|
|
step_num += 1
|
|
freq += step_mhz
|
|
|
|
return freqs, powers, results
|
|
|
|
def ensure_booted(self) -> None:
|
|
"""Boot demodulator and enable LNB power if not already running."""
|
|
status = self.get_config()
|
|
if not (status & 0x01):
|
|
self.boot(on=True)
|
|
time.sleep(0.5)
|
|
status = self.get_config()
|
|
if not (status & 0x01):
|
|
raise RuntimeError("Device failed to start")
|
|
if not (status & 0x04):
|
|
self.start_intersil(on=True)
|
|
time.sleep(0.3)
|
|
|
|
def configure_lnb(self, pol: str = None, band: str = None,
|
|
lnb_lo: float = None, disable_lnb: bool = False) -> float:
|
|
"""
|
|
Configure LNB voltage, tone, and return the effective LO frequency.
|
|
|
|
pol: 'H'/'V'/'L'/'R' or None (don't change)
|
|
band: 'low'/'high' or None (don't change)
|
|
lnb_lo: explicit LO freq in MHz, or None for auto
|
|
disable_lnb: True to disable LNB power (for direct input)
|
|
"""
|
|
if disable_lnb:
|
|
self.start_intersil(on=False)
|
|
return 0.0
|
|
|
|
if pol:
|
|
high_voltage = pol.upper() in ("H", "L")
|
|
self.set_lnb_voltage(high_voltage)
|
|
|
|
if band:
|
|
self.set_22khz_tone(band == "high")
|
|
|
|
if lnb_lo is not None:
|
|
return lnb_lo
|
|
elif band == "high":
|
|
return LNB_LO_HIGH
|
|
else:
|
|
return LNB_LO_LOW
|