Add alternative operating modes: spectrum, scan, monitor, lband, track

Firmware v3.02.0 adds three new vendor commands:
- 0xB7 SIGNAL_MONITOR: fast 8-byte combined signal read
- 0xB8 TUNE_MONITOR: tune + dwell + read in one round-trip
- 0xB9 MULTI_REG_READ: batch read up to 64 indirect registers

New tools/skywalker.py provides five modes that use the BCM4500's
AGC registers as a crude power detector across 950-2150 MHz IF,
even without demodulator lock:
- spectrum: sweep analyzer with ASCII/waterfall/matplotlib display
- scan: automated transponder scanner (sweep + peak detect + blind scan)
- monitor: real-time signal strength for dish alignment
- lband: direct input analyzer with L-band allocation annotations
- track: carrier/beacon tracker with CSV/JSON logging and drift detection

Extracts shared SkyWalker1 class and constants into skywalker_lib.py;
tune.py now imports from the shared library.
This commit is contained in:
Ryan Malloy 2026-02-12 17:29:00 -07:00
parent b21f4957f6
commit 23055f34ab
4 changed files with 1756 additions and 312 deletions

View File

@ -2,8 +2,9 @@
* Genpix SkyWalker-1 Custom Firmware
* For Cypress CY7C68013A (FX2LP) + Broadcom BCM4500 demodulator
*
* Stock-compatible vendor commands (0x80-0x94) plus new
* spectrum sweep, raw demod access, and blind scan commands (0xB0-0xB3).
* Stock-compatible vendor commands (0x80-0x94) plus custom
* spectrum sweep, raw demod access, blind scan (0xB0-0xB3),
* hardware diagnostics (0xB4-0xB6), and signal monitoring (0xB7-0xB9).
*
* SDCC + fx2lib toolchain. Loaded into FX2 RAM for testing.
*/
@ -53,6 +54,9 @@
#define RAW_DEMOD_READ 0xB1
#define RAW_DEMOD_WRITE 0xB2
#define BLIND_SCAN 0xB3
#define SIGNAL_MONITOR 0xB7
#define TUNE_MONITOR 0xB8
#define MULTI_REG_READ 0xB9
/* configuration status byte bits */
#define BM_STARTED 0x01
@ -85,6 +89,9 @@ volatile __bit got_sud;
static __xdata BYTE i2c_buf[16];
static __xdata BYTE i2c_rd[8];
/* TUNE_MONITOR result buffer: filled by OUT phase, returned by IN phase */
static __xdata BYTE tm_result[10];
/*
* BCM4500 register initialization data extracted from stock v2.06 firmware.
* FUN_CODE_0ddd writes these 3 blocks to BCM4500 indirect registers (page 0)
@ -1055,8 +1062,8 @@ BOOL handle_vendorcommand(BYTE cmd) {
/* 0x92: GET_FW_VERS -- return firmware version and build date */
case GET_FW_VERS:
EP0BUF[0] = 0x00; /* patch -> version 3.01.0 */
EP0BUF[1] = 0x01; /* minor */
EP0BUF[0] = 0x00; /* patch -> version 3.02.0 */
EP0BUF[1] = 0x02; /* minor */
EP0BUF[2] = 0x03; /* major */
EP0BUF[3] = 0x0C; /* day = 12 */
EP0BUF[4] = 0x02; /* month = 2 */
@ -1216,6 +1223,90 @@ BOOL handle_vendorcommand(BYTE cmd) {
return TRUE;
}
/* 0xB7: SIGNAL_MONITOR -- fast combined signal read (8 bytes)
* Returns SNR(2) + AGC1(2) + AGC2(2) + lock(1) + status(1)
* in a single USB transfer instead of 3 separate reads. */
case SIGNAL_MONITOR: {
BYTE sm_val;
/* SNR: indirect regs 0x00-0x01 */
bcm_indirect_read(0x00, &EP0BUF[0]);
bcm_indirect_read(0x01, &EP0BUF[1]);
/* AGC1: indirect regs 0x02-0x03 */
bcm_indirect_read(0x02, &EP0BUF[2]);
bcm_indirect_read(0x03, &EP0BUF[3]);
/* AGC2: indirect regs 0x04-0x05 */
bcm_indirect_read(0x04, &EP0BUF[4]);
bcm_indirect_read(0x05, &EP0BUF[5]);
/* Lock register (direct 0xA4) */
sm_val = 0;
bcm_direct_read(BCM_REG_LOCK, &sm_val);
EP0BUF[6] = sm_val;
/* Status register (direct 0xA2) */
sm_val = 0;
bcm_direct_read(BCM_REG_STATUS, &sm_val);
EP0BUF[7] = sm_val;
EP0BCH = 0;
EP0BCL = 8;
return TRUE;
}
/* 0xB8: TUNE_MONITOR -- tune + dwell + read in one round-trip
* OUT phase (0x40): receive 10-byte tune payload, tune, dwell, read signal
* IN phase (0xC0): return stored 10-byte result
* wValue = dwell time in ms (1-255) */
case TUNE_MONITOR: {
if (SETUPDAT[0] & 0x80) {
/* IN phase: return stored result from previous OUT phase */
BYTE ti;
for (ti = 0; ti < 10; ti++)
EP0BUF[ti] = tm_result[ti];
EP0BCH = 0;
EP0BCL = 10;
} else {
/* OUT phase: tune, dwell, measure */
BYTE dwell = (BYTE)wval;
EP0BCL = 0;
SYNCDELAY;
while (EP0CS & bmEPBUSY)
;
do_tune();
if (dwell > 0)
delay(dwell);
/* Read signal into result buffer */
bcm_indirect_read(0x00, &tm_result[0]);
bcm_indirect_read(0x01, &tm_result[1]);
bcm_indirect_read(0x02, &tm_result[2]);
bcm_indirect_read(0x03, &tm_result[3]);
bcm_indirect_read(0x04, &tm_result[4]);
bcm_indirect_read(0x05, &tm_result[5]);
tm_result[6] = 0;
bcm_direct_read(BCM_REG_LOCK, &tm_result[6]);
tm_result[7] = 0;
bcm_direct_read(BCM_REG_STATUS, &tm_result[7]);
tm_result[8] = dwell;
tm_result[9] = (BYTE)(wval >> 8);
}
return TRUE;
}
/* 0xB9: MULTI_REG_READ -- batch read of contiguous indirect registers
* wValue = start register, wIndex = count (1-64)
* Returns count bytes, one per register */
case MULTI_REG_READ: {
BYTE start_reg = (BYTE)wval;
BYTE count = (BYTE)SETUP_INDEX();
BYTE mi;
if (count == 0 || count > 64)
count = 1;
for (mi = 0; mi < count; mi++) {
EP0BUF[mi] = 0;
bcm_indirect_read(start_reg + mi, &EP0BUF[mi]);
}
EP0BCH = 0;
EP0BCL = count;
return TRUE;
}
default:
return FALSE;
}

1086
tools/skywalker.py Executable file

File diff suppressed because it is too large Load Diff

565
tools/skywalker_lib.py Normal file
View File

@ -0,0 +1,565 @@
#!/usr/bin/env python3
"""
Genpix SkyWalker-1 shared library.
Provides the SkyWalker1 USB interface class, constants, and signal
processing utilities used by skywalker.py and tune.py.
"""
import sys
import struct
import time
import math
try:
import usb.core
import usb.util
except ImportError:
print("pyusb required: pip install pyusb")
sys.exit(1)
# --- USB identifiers ---
VENDOR_ID = 0x09C0
PRODUCT_ID = 0x0203
EP2_ADDR = 0x82
EP2_URB_SIZE = 8192
# --- Vendor commands ---
CMD_GET_8PSK_CONFIG = 0x80
CMD_I2C_WRITE = 0x83
CMD_I2C_READ = 0x84
CMD_ARM_TRANSFER = 0x85
CMD_TUNE_8PSK = 0x86
CMD_GET_SIGNAL_STRENGTH = 0x87
CMD_LOAD_BCM4500 = 0x88
CMD_BOOT_8PSK = 0x89
CMD_START_INTERSIL = 0x8A
CMD_SET_LNB_VOLTAGE = 0x8B
CMD_SET_22KHZ_TONE = 0x8C
CMD_SEND_DISEQC = 0x8D
CMD_GET_SIGNAL_LOCK = 0x90
CMD_GET_FW_VERS = 0x92
CMD_GET_SERIAL_NUMBER = 0x93
CMD_USE_EXTRA_VOLT = 0x94
# Custom commands (v3.01+)
CMD_SPECTRUM_SWEEP = 0xB0
CMD_RAW_DEMOD_READ = 0xB1
CMD_RAW_DEMOD_WRITE = 0xB2
CMD_BLIND_SCAN = 0xB3
CMD_I2C_BUS_SCAN = 0xB4
CMD_I2C_RAW_READ = 0xB5
CMD_I2C_DIAG = 0xB6
# Custom commands (v3.02+)
CMD_SIGNAL_MONITOR = 0xB7
CMD_TUNE_MONITOR = 0xB8
CMD_MULTI_REG_READ = 0xB9
# --- Config status bits ---
CONFIG_BITS = {
0x01: ("8PSK Started", "bm8pskStarted"),
0x02: ("BCM4500 FW Loaded", "bm8pskFW_Loaded"),
0x04: ("LNB Power On", "bmIntersilOn"),
0x08: ("DVB Mode", "bmDVBmode"),
0x10: ("22 kHz Tone", "bm22kHz"),
0x20: ("18V Selected", "bmSEL18V"),
0x40: ("DC Tuned", "bmDCtuned"),
0x80: ("Armed (streaming)", "bmArmed"),
}
# --- Modulation and FEC tables ---
MODULATIONS = {
"qpsk": (0, "DVB-S QPSK"),
"turbo-qpsk": (1, "Turbo QPSK"),
"turbo-8psk": (2, "Turbo 8PSK"),
"turbo-16qam": (3, "Turbo 16QAM"),
"dcii-combo": (4, "DCII Combo"),
"dcii-i": (5, "DCII I-stream"),
"dcii-q": (6, "DCII Q-stream"),
"dcii-oqpsk": (7, "DCII Offset QPSK"),
"dss": (8, "DSS QPSK"),
"bpsk": (9, "DVB BPSK"),
}
FEC_RATES = {
"dvbs": {
"1/2": 0, "2/3": 1, "3/4": 2, "5/6": 3,
"7/8": 4, "auto": 5, "none": 6,
},
"turbo": {
"1/2": 0, "2/3": 1, "3/4": 2, "5/6": 3, "auto": 4,
},
"turbo-16qam": {
"3/4": 0, "auto": 0,
},
"dcii": {
"1/2": 0, "2/3": 1, "6/7": 2, "3/4": 3, "5/11": 4,
"1/2+": 5, "2/3+": 6, "6/7+": 7, "3/4+": 8, "auto": 0,
},
}
MOD_FEC_GROUP = {
"qpsk": "dvbs",
"turbo-qpsk": "turbo",
"turbo-8psk": "turbo",
"turbo-16qam": "turbo-16qam",
"dcii-combo": "dcii",
"dcii-i": "dcii",
"dcii-q": "dcii",
"dcii-oqpsk": "dcii",
"dss": "dvbs",
"bpsk": "dvbs",
}
# --- LNB defaults ---
LNB_LO_LOW = 9750 # Universal LNB low-band (MHz)
LNB_LO_HIGH = 10600 # Universal LNB high-band (MHz)
# --- L-band allocations (for annotation) ---
LBAND_ALLOCATIONS = [
(1240, 1300, "Amateur 23cm"),
(1525, 1559, "Inmarsat downlink"),
(1559, 1610, "GNSS (GPS L1, Galileo E1)"),
(1610, 1626, "Iridium downlink"),
(1670, 1710, "MetSat (GOES LRIT, NOAA HRPT)"),
(1710, 1785, "LTE/AWS uplink"),
(1920, 2025, "UMTS uplink"),
]
# --- Signal processing helpers ---
def snr_raw_to_db(snr_raw: int) -> float:
"""Convert BCM4500 SNR register to dB. Register is dBu * 256."""
return snr_raw / 256.0
def snr_raw_to_pct(snr_raw: int) -> float:
"""Convert raw SNR to percentage (0-100 scale, clamped)."""
scaled = min(snr_raw * 17, 65535)
return (scaled / 65535) * 100
def agc_to_power_db(agc1: int, agc2: int) -> float:
"""
Estimate received power from AGC register values.
The AGC loop adjusts gain to keep the signal level constant at the
ADC input. Higher AGC = weaker signal (more gain needed). This is
an approximation; the exact mapping depends on the BCM3440 tuner's
gain curve.
Returns a relative dB value (higher = stronger signal).
"""
# AGC1 is the primary gain control, AGC2 is fine adjustment.
# Invert: low AGC value = high signal = high power.
# Scale to approximate dB with ~40 dB dynamic range.
combined = agc1 + (agc2 >> 4)
if combined == 0:
return 0.0
# Rough linear-to-dB: 65535 AGC ≈ -40 dB, 0 AGC ≈ 0 dB
return -40.0 * (combined / 65535.0)
def detect_peaks(freqs: list, powers: list, threshold_db: float = 3.0) -> list:
"""
Find peaks in a spectrum sweep.
Returns list of (freq_mhz, power_db, index) tuples for each local
maximum that exceeds the noise floor by threshold_db.
"""
if len(powers) < 3:
return []
# Estimate noise floor as the 25th percentile
sorted_p = sorted(powers)
noise_floor = sorted_p[len(sorted_p) // 4]
peaks = []
for i in range(1, len(powers) - 1):
if powers[i] > powers[i - 1] and powers[i] > powers[i + 1]:
if powers[i] - noise_floor >= threshold_db:
peaks.append((freqs[i], powers[i], i))
return peaks
def if_to_rf(if_mhz: float, lnb_lo: float) -> float:
"""Convert IF frequency to actual RF frequency given LNB LO."""
return if_mhz + lnb_lo
def rf_to_if(rf_mhz: float, lnb_lo: float) -> float:
"""Convert actual RF frequency to IF frequency given LNB LO."""
return rf_mhz - lnb_lo
def signal_bar(pct: float, width: int = 40) -> str:
"""Render an ASCII signal strength bar."""
filled = int(pct / 100 * width)
filled = max(0, min(filled, width))
bar = '#' * filled + '-' * (width - filled)
return f"[{bar}] {pct:.1f}%"
def format_config_bits(status: int) -> list:
"""Return list of (bit_name, is_set) tuples for config byte."""
result = []
for bit, (name, _field) in CONFIG_BITS.items():
result.append((name, bool(status & bit)))
return result
# --- SkyWalker1 USB interface ---
class SkyWalker1:
"""USB interface to the Genpix SkyWalker-1 DVB-S receiver."""
def __init__(self, verbose: bool = False):
self.dev = None
self.detached_intf = None
self.verbose = verbose
def open(self) -> None:
"""Find and claim the SkyWalker-1 USB device."""
self.dev = usb.core.find(idVendor=VENDOR_ID, idProduct=PRODUCT_ID)
if self.dev is None:
print("SkyWalker-1 not found (VID 0x09C0, PID 0x0203). Is it plugged in?")
sys.exit(1)
for cfg in self.dev:
for intf in cfg:
if self.dev.is_kernel_driver_active(intf.bInterfaceNumber):
try:
self.dev.detach_kernel_driver(intf.bInterfaceNumber)
self.detached_intf = intf.bInterfaceNumber
if self.verbose:
print(f" Detached kernel driver from interface {intf.bInterfaceNumber}")
except usb.core.USBError as e:
print(f"Cannot detach kernel driver: {e}")
print("The gp8psk module must be unbound first. Try one of:")
print(" sudo modprobe -r dvb_usb_gp8psk")
print(" echo '<bus-path>' | sudo tee /sys/bus/usb/drivers/gp8psk/unbind")
sys.exit(1)
try:
self.dev.set_configuration()
except usb.core.USBError:
pass
def close(self) -> None:
"""Release device and re-attach kernel driver."""
if self.dev is None:
return
if self.detached_intf is not None:
try:
usb.util.release_interface(self.dev, self.detached_intf)
self.dev.attach_kernel_driver(self.detached_intf)
if self.verbose:
print("Re-attached kernel driver")
except usb.core.USBError:
print("Note: run 'sudo modprobe dvb_usb_gp8psk' to reload driver")
def __enter__(self):
self.open()
return self
def __exit__(self, *exc):
self.close()
# -- Low-level USB transfers --
def _vendor_in(self, request: int, value: int = 0, index: int = 0,
length: int = 64, retries: int = 3) -> bytes:
"""Vendor IN control transfer (device-to-host), with retry."""
for attempt in range(retries):
try:
data = self.dev.ctrl_transfer(
usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_IN,
request, value, index, length, 2000
)
if self.verbose:
raw = bytes(data).hex(' ')
print(f" USB IN req=0x{request:02X} val=0x{value:04X} "
f"idx=0x{index:04X} -> [{len(data)}] {raw}")
if len(data) == length:
return bytes(data)
if self.verbose:
print(f" Partial read ({len(data)}/{length}), retry {attempt + 1}")
continue
except usb.core.USBError as e:
if self.verbose:
print(f" USB IN req=0x{request:02X} FAILED: {e}")
if attempt == retries - 1:
raise
return bytes(data)
def _vendor_out(self, request: int, value: int = 0, index: int = 0,
data: bytes = b'') -> int:
"""Vendor OUT control transfer (host-to-device)."""
if self.verbose:
raw = data.hex(' ') if data else "(no data)"
print(f" USB OUT req=0x{request:02X} val=0x{value:04X} "
f"idx=0x{index:04X} data=[{len(data)}] {raw}")
return self.dev.ctrl_transfer(
usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_OUT,
request, value, index, data, 2000
)
# -- Device info --
def get_config(self) -> int:
"""Read 8PSK config status byte."""
data = self._vendor_in(CMD_GET_8PSK_CONFIG, length=1)
return data[0]
def get_fw_version(self) -> dict:
"""Read firmware version. Returns dict with version string and date."""
data = self._vendor_in(CMD_GET_FW_VERS, length=6)
return {
"major": data[2],
"minor": data[1],
"patch": data[0],
"version": f"{data[2]}.{data[1]:02d}.{data[0]}",
"date": f"20{data[5]:02d}-{data[4]:02d}-{data[3]:02d}",
}
def get_signal_lock(self) -> bool:
"""Read signal lock status."""
data = self._vendor_in(CMD_GET_SIGNAL_LOCK, length=1)
return data[0] != 0
def get_signal_strength(self) -> dict:
"""Read signal strength. Returns SNR info dict."""
data = self._vendor_in(CMD_GET_SIGNAL_STRENGTH, length=6)
snr_raw = struct.unpack_from('<H', data, 0)[0]
snr_db = snr_raw_to_db(snr_raw)
snr_pct = snr_raw_to_pct(snr_raw)
return {
"snr_raw": snr_raw,
"snr_db": snr_db,
"snr_pct": snr_pct,
"raw_bytes": bytes(data).hex(' '),
}
# -- Power and boot --
def boot(self, on: bool = True) -> int:
"""Power on/off the 8PSK demodulator."""
data = self._vendor_in(CMD_BOOT_8PSK, value=int(on), length=1)
return data[0]
def start_intersil(self, on: bool = True) -> int:
"""Enable/disable LNB power supply."""
data = self._vendor_in(CMD_START_INTERSIL, value=int(on), length=1)
return data[0]
def set_lnb_voltage(self, high: bool) -> None:
"""Set LNB voltage: high=True for 18V, False for 13V."""
self._vendor_out(CMD_SET_LNB_VOLTAGE, value=int(high))
def set_22khz_tone(self, on: bool) -> None:
"""Enable/disable 22 kHz tone."""
self._vendor_out(CMD_SET_22KHZ_TONE, value=int(on))
def set_extra_voltage(self, on: bool) -> None:
"""Enable +1V LNB boost: 13->14V, 18->19V."""
self._vendor_out(CMD_USE_EXTRA_VOLT, value=int(on))
# -- Tuning --
def tune(self, symbol_rate_sps: int, freq_khz: int,
mod_index: int, fec_index: int) -> None:
"""Send TUNE_8PSK with 10-byte payload."""
payload = struct.pack('<II', symbol_rate_sps, freq_khz)
payload += bytes([mod_index, fec_index])
self._vendor_out(CMD_TUNE_8PSK, data=payload)
# -- Streaming --
def arm_transfer(self, on: bool) -> None:
"""Start/stop MPEG-2 transport stream."""
self._vendor_out(CMD_ARM_TRANSFER, value=int(on))
def read_stream(self, size: int = EP2_URB_SIZE,
timeout: int = 1000) -> bytes:
"""Read a chunk from the TS bulk endpoint."""
try:
data = self.dev.read(EP2_ADDR, size, timeout)
return bytes(data)
except usb.core.USBTimeoutError:
return b''
except usb.core.USBError as e:
if self.verbose:
print(f" EP2 read error: {e}")
return b''
# -- DiSEqC --
def send_diseqc_tone_burst(self, mini_cmd: int) -> None:
"""Send tone burst (mini-DiSEqC). 0=SEC_MINI_A, 1=SEC_MINI_B."""
self._vendor_out(CMD_SEND_DISEQC, value=mini_cmd)
def send_diseqc_message(self, msg: bytes) -> None:
"""Send full DiSEqC message (3-6 bytes)."""
if len(msg) < 3 or len(msg) > 6:
raise ValueError(f"DiSEqC message must be 3-6 bytes, got {len(msg)}")
self._vendor_out(CMD_SEND_DISEQC, value=msg[0], data=msg)
# -- New commands (v3.02+) --
def signal_monitor(self) -> dict:
"""
Fast combined signal read (0xB7). Returns 8 bytes in one transfer:
SNR(2) + AGC1(2) + AGC2(2) + lock(1) + status(1).
"""
data = self._vendor_in(CMD_SIGNAL_MONITOR, length=8)
snr_raw = struct.unpack_from('<H', data, 0)[0]
agc1 = struct.unpack_from('<H', data, 2)[0]
agc2 = struct.unpack_from('<H', data, 4)[0]
return {
"snr_raw": snr_raw,
"snr_db": snr_raw_to_db(snr_raw),
"snr_pct": snr_raw_to_pct(snr_raw),
"agc1": agc1,
"agc2": agc2,
"power_db": agc_to_power_db(agc1, agc2),
"lock": data[6],
"locked": bool(data[6] & 0x20),
"status": data[7],
}
def tune_monitor(self, symbol_rate_sps: int, freq_khz: int,
mod_index: int, fec_index: int,
dwell_ms: int = 10) -> dict:
"""
Tune + dwell + signal read in one round-trip (0xB8).
Sends tune parameters via OUT phase, firmware tunes + waits
dwell_ms + reads signal. Then IN phase returns the result.
"""
dwell_ms = max(1, min(255, dwell_ms))
payload = struct.pack('<II', symbol_rate_sps, freq_khz)
payload += bytes([mod_index, fec_index])
# OUT phase: send tune data, firmware does tune + dwell + read
self._vendor_out(CMD_TUNE_MONITOR, value=dwell_ms, data=payload)
# IN phase: read stored result
data = self._vendor_in(CMD_TUNE_MONITOR, length=10)
snr_raw = struct.unpack_from('<H', data, 0)[0]
agc1 = struct.unpack_from('<H', data, 2)[0]
agc2 = struct.unpack_from('<H', data, 4)[0]
return {
"snr_raw": snr_raw,
"snr_db": snr_raw_to_db(snr_raw),
"agc1": agc1,
"agc2": agc2,
"power_db": agc_to_power_db(agc1, agc2),
"lock": data[6],
"locked": bool(data[6] & 0x20),
"status": data[7],
"dwell_ms": struct.unpack_from('<H', data, 8)[0],
}
def multi_reg_read(self, start_reg: int, count: int) -> bytes:
"""
Batch read of contiguous BCM4500 indirect registers (0xB9).
Returns count bytes, one per register. Up to 64 registers
in a single USB transfer (vs. individual 0xB1 reads).
"""
count = max(1, min(64, count))
data = self._vendor_in(CMD_MULTI_REG_READ, value=start_reg,
index=count, length=count)
return bytes(data)
# -- High-level sweep helpers --
def sweep_spectrum(self, start_mhz: float, stop_mhz: float,
step_mhz: float = 5.0, dwell_ms: int = 10,
sr_ksps: int = 20000, mod_index: int = 0,
fec_index: int = 5,
callback=None) -> tuple:
"""
Sweep a frequency range and return power measurements.
Uses TUNE_MONITOR (0xB8) at each step for efficient measurement.
Default tune params: QPSK, auto-FEC, 20 Msps.
callback(freq_mhz, step_num, total_steps, result) is called
per step if provided.
Returns (freqs_mhz[], powers_db[], raw_results[]).
"""
sr_sps = sr_ksps * 1000
freqs = []
powers = []
results = []
freq = start_mhz
steps = int((stop_mhz - start_mhz) / step_mhz) + 1
step_num = 0
while freq <= stop_mhz:
freq_khz = int(freq * 1000)
result = self.tune_monitor(sr_sps, freq_khz, mod_index,
fec_index, dwell_ms)
freqs.append(freq)
powers.append(result["power_db"])
results.append(result)
if callback:
callback(freq, step_num, steps, result)
step_num += 1
freq += step_mhz
return freqs, powers, results
def ensure_booted(self) -> None:
"""Boot demodulator and enable LNB power if not already running."""
status = self.get_config()
if not (status & 0x01):
self.boot(on=True)
time.sleep(0.5)
status = self.get_config()
if not (status & 0x01):
raise RuntimeError("Device failed to start")
if not (status & 0x04):
self.start_intersil(on=True)
time.sleep(0.3)
def configure_lnb(self, pol: str = None, band: str = None,
lnb_lo: float = None, disable_lnb: bool = False) -> float:
"""
Configure LNB voltage, tone, and return the effective LO frequency.
pol: 'H'/'V'/'L'/'R' or None (don't change)
band: 'low'/'high' or None (don't change)
lnb_lo: explicit LO freq in MHz, or None for auto
disable_lnb: True to disable LNB power (for direct input)
"""
if disable_lnb:
self.start_intersil(on=False)
return 0.0
if pol:
high_voltage = pol.upper() in ("H", "L")
self.set_lnb_voltage(high_voltage)
if band:
self.set_22khz_tone(band == "high")
if lnb_lo is not None:
return lnb_lo
elif band == "high":
return LNB_LO_HIGH
else:
return LNB_LO_LOW

View File

@ -18,316 +18,18 @@ import json
import signal
import os
try:
# Add tools directory to path for library import
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from skywalker_lib import (
SkyWalker1, VENDOR_ID, PRODUCT_ID, EP2_URB_SIZE,
MODULATIONS, FEC_RATES, MOD_FEC_GROUP,
LNB_LO_LOW, LNB_LO_HIGH,
CONFIG_BITS,
signal_bar, format_config_bits,
)
import usb.core
import usb.util
except ImportError:
print("pyusb required: pip install pyusb")
sys.exit(1)
VENDOR_ID = 0x09C0
PRODUCT_ID = 0x0203
# Streaming endpoint
EP2_ADDR = 0x82
EP2_URB_SIZE = 8192
# Vendor commands
CMD_GET_8PSK_CONFIG = 0x80
CMD_I2C_WRITE = 0x83
CMD_I2C_READ = 0x84
CMD_ARM_TRANSFER = 0x85
CMD_TUNE_8PSK = 0x86
CMD_GET_SIGNAL_STRENGTH = 0x87
CMD_LOAD_BCM4500 = 0x88
CMD_BOOT_8PSK = 0x89
CMD_START_INTERSIL = 0x8A
CMD_SET_LNB_VOLTAGE = 0x8B
CMD_SET_22KHZ_TONE = 0x8C
CMD_SEND_DISEQC = 0x8D
CMD_GET_SIGNAL_LOCK = 0x90
CMD_GET_FW_VERS = 0x92
CMD_GET_SERIAL_NUMBER = 0x93
CMD_USE_EXTRA_VOLT = 0x94
# Config status bits (GET_8PSK_CONFIG response)
CONFIG_BITS = {
0x01: ("8PSK Started", "bm8pskStarted"),
0x02: ("BCM4500 FW Loaded", "bm8pskFW_Loaded"),
0x04: ("LNB Power On", "bmIntersilOn"),
0x08: ("DVB Mode", "bmDVBmode"),
0x10: ("22 kHz Tone", "bm22kHz"),
0x20: ("18V Selected", "bmSEL18V"),
0x40: ("DC Tuned", "bmDCtuned"),
0x80: ("Armed (streaming)", "bmArmed"),
}
# Modulation types for TUNE_8PSK byte 8
MODULATIONS = {
"qpsk": (0, "DVB-S QPSK"),
"turbo-qpsk": (1, "Turbo QPSK"),
"turbo-8psk": (2, "Turbo 8PSK"),
"turbo-16qam": (3, "Turbo 16QAM"),
"dcii-combo": (4, "DCII Combo"),
"dcii-i": (5, "DCII I-stream"),
"dcii-q": (6, "DCII Q-stream"),
"dcii-oqpsk": (7, "DCII Offset QPSK"),
"dss": (8, "DSS QPSK"),
"bpsk": (9, "DVB BPSK"),
}
# FEC rate indices per modulation group
FEC_RATES = {
"dvbs": {
"1/2": 0, "2/3": 1, "3/4": 2, "5/6": 3,
"7/8": 4, "auto": 5, "none": 6,
},
"turbo": {
"1/2": 0, "2/3": 1, "3/4": 2, "5/6": 3, "auto": 4,
},
"turbo-16qam": {
"3/4": 0, "auto": 0,
},
"dcii": {
"1/2": 0, "2/3": 1, "6/7": 2, "3/4": 3, "5/11": 4,
"1/2+": 5, "2/3+": 6, "6/7+": 7, "3/4+": 8, "auto": 0,
},
}
# Map modulation names to FEC group
MOD_FEC_GROUP = {
"qpsk": "dvbs",
"turbo-qpsk": "turbo",
"turbo-8psk": "turbo",
"turbo-16qam": "turbo-16qam",
"dcii-combo": "dcii",
"dcii-i": "dcii",
"dcii-q": "dcii",
"dcii-oqpsk": "dcii",
"dss": "dvbs",
"bpsk": "dvbs",
}
# Default LNB LO frequencies (MHz)
LNB_LO_LOW = 9750 # Universal LNB low-band
LNB_LO_HIGH = 10600 # Universal LNB high-band
class SkyWalker1:
"""USB interface to the Genpix SkyWalker-1 DVB-S receiver."""
def __init__(self, verbose: bool = False):
self.dev = None
self.detached_intf = None
self.verbose = verbose
def open(self) -> None:
"""Find and claim the SkyWalker-1 USB device."""
self.dev = usb.core.find(idVendor=VENDOR_ID, idProduct=PRODUCT_ID)
if self.dev is None:
print("SkyWalker-1 not found (VID 0x09C0, PID 0x0203). Is it plugged in?")
sys.exit(1)
# Detach kernel driver if bound
for cfg in self.dev:
for intf in cfg:
if self.dev.is_kernel_driver_active(intf.bInterfaceNumber):
try:
self.dev.detach_kernel_driver(intf.bInterfaceNumber)
self.detached_intf = intf.bInterfaceNumber
if self.verbose:
print(f" Detached kernel driver from interface {intf.bInterfaceNumber}")
except usb.core.USBError as e:
print(f"Cannot detach kernel driver: {e}")
print("The gp8psk module must be unbound first. Try one of:")
print(" sudo modprobe -r dvb_usb_gp8psk")
print(" echo '<bus-path>' | sudo tee /sys/bus/usb/drivers/gp8psk/unbind")
sys.exit(1)
try:
self.dev.set_configuration()
except usb.core.USBError:
pass # May already be configured
def close(self) -> None:
"""Release device and re-attach kernel driver."""
if self.dev is None:
return
if self.detached_intf is not None:
try:
usb.util.release_interface(self.dev, self.detached_intf)
self.dev.attach_kernel_driver(self.detached_intf)
if self.verbose:
print("Re-attached kernel driver")
except usb.core.USBError:
print("Note: run 'sudo modprobe dvb_usb_gp8psk' to reload driver")
def __enter__(self):
self.open()
return self
def __exit__(self, *exc):
self.close()
# -- Low-level USB transfers --
def _vendor_in(self, request: int, value: int = 0, index: int = 0,
length: int = 64, retries: int = 3) -> bytes:
"""Vendor IN control transfer (device-to-host), with retry."""
for attempt in range(retries):
try:
data = self.dev.ctrl_transfer(
usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_IN,
request, value, index, length, 2000
)
if self.verbose:
raw = bytes(data).hex(' ')
print(f" USB IN req=0x{request:02X} val=0x{value:04X} "
f"idx=0x{index:04X} -> [{len(data)}] {raw}")
if len(data) == length:
return bytes(data)
# Partial read, retry
if self.verbose:
print(f" Partial read ({len(data)}/{length}), retry {attempt + 1}")
continue
except usb.core.USBError as e:
if self.verbose:
print(f" USB IN req=0x{request:02X} FAILED: {e}")
if attempt == retries - 1:
raise
return bytes(data)
def _vendor_out(self, request: int, value: int = 0, index: int = 0,
data: bytes = b'') -> int:
"""Vendor OUT control transfer (host-to-device)."""
if self.verbose:
raw = data.hex(' ') if data else "(no data)"
print(f" USB OUT req=0x{request:02X} val=0x{value:04X} "
f"idx=0x{index:04X} data=[{len(data)}] {raw}")
return self.dev.ctrl_transfer(
usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_OUT,
request, value, index, data, 2000
)
# -- Device info commands --
def get_config(self) -> int:
"""Read 8PSK config status byte (GET_8PSK_CONFIG 0x80)."""
data = self._vendor_in(CMD_GET_8PSK_CONFIG, length=1)
return data[0]
def get_fw_version(self) -> dict:
"""Read firmware version (GET_FW_VERS 0x92). Returns dict."""
data = self._vendor_in(CMD_GET_FW_VERS, length=6)
return {
"major": data[2],
"minor": data[1],
"patch": data[0],
"version": f"{data[2]}.{data[1]:02d}.{data[0]}",
"date": f"20{data[5]:02d}-{data[4]:02d}-{data[3]:02d}",
}
def get_signal_lock(self) -> bool:
"""Read signal lock status (GET_SIGNAL_LOCK 0x90)."""
data = self._vendor_in(CMD_GET_SIGNAL_LOCK, length=1)
return data[0] != 0
def get_signal_strength(self) -> dict:
"""Read signal strength (GET_SIGNAL_STRENGTH 0x87). Returns SNR info."""
data = self._vendor_in(CMD_GET_SIGNAL_STRENGTH, length=6)
snr_raw = struct.unpack_from('<H', data, 0)[0]
# SNR is in dBu * 256 units. Scale: snr * 17 maps to 0-65535.
snr_scaled = min(snr_raw * 17, 65535)
snr_pct = (snr_scaled / 65535) * 100
snr_db = snr_raw / 256.0
return {
"snr_raw": snr_raw,
"snr_db": snr_db,
"snr_pct": snr_pct,
"raw_bytes": bytes(data).hex(' '),
}
# -- Power and boot commands --
def boot(self, on: bool = True) -> int:
"""Power on/off the 8PSK demodulator (BOOT_8PSK 0x89)."""
data = self._vendor_in(CMD_BOOT_8PSK, value=int(on), length=1)
return data[0]
def start_intersil(self, on: bool = True) -> int:
"""Enable/disable LNB power supply (START_INTERSIL 0x8A)."""
data = self._vendor_in(CMD_START_INTERSIL, value=int(on), length=1)
return data[0]
def set_lnb_voltage(self, high: bool) -> None:
"""Set LNB voltage: high=True for 18V (H/L), high=False for 13V (V/R)."""
self._vendor_out(CMD_SET_LNB_VOLTAGE, value=int(high))
def set_22khz_tone(self, on: bool) -> None:
"""Enable/disable 22 kHz tone (SET_22KHZ_TONE 0x8C)."""
self._vendor_out(CMD_SET_22KHZ_TONE, value=int(on))
def set_extra_voltage(self, on: bool) -> None:
"""Enable +1V LNB boost: 13->14V, 18->19V (USE_EXTRA_VOLT 0x94)."""
self._vendor_out(CMD_USE_EXTRA_VOLT, value=int(on))
# -- Tuning --
def tune(self, symbol_rate_sps: int, freq_khz: int,
mod_index: int, fec_index: int) -> None:
"""Send TUNE_8PSK (0x86) with 10-byte payload."""
payload = struct.pack('<II', symbol_rate_sps, freq_khz)
payload += bytes([mod_index, fec_index])
self._vendor_out(CMD_TUNE_8PSK, data=payload)
# -- Streaming --
def arm_transfer(self, on: bool) -> None:
"""Start/stop MPEG-2 transport stream (ARM_TRANSFER 0x85)."""
self._vendor_out(CMD_ARM_TRANSFER, value=int(on))
def read_stream(self, size: int = EP2_URB_SIZE,
timeout: int = 1000) -> bytes:
"""Read a chunk from the TS bulk endpoint (EP2 0x82)."""
try:
data = self.dev.read(EP2_ADDR, size, timeout)
return bytes(data)
except usb.core.USBTimeoutError:
return b''
except usb.core.USBError as e:
if self.verbose:
print(f" EP2 read error: {e}")
return b''
# -- DiSEqC --
def send_diseqc_tone_burst(self, mini_cmd: int) -> None:
"""Send tone burst (mini-DiSEqC). 0=SEC_MINI_A, 1=SEC_MINI_B."""
self._vendor_out(CMD_SEND_DISEQC, value=mini_cmd)
def send_diseqc_message(self, msg: bytes) -> None:
"""Send full DiSEqC message (3-6 bytes). wValue = framing byte."""
if len(msg) < 3 or len(msg) > 6:
raise ValueError(f"DiSEqC message must be 3-6 bytes, got {len(msg)}")
self._vendor_out(CMD_SEND_DISEQC, value=msg[0], data=msg)
# -- Signal bar rendering --
def signal_bar(pct: float, width: int = 40) -> str:
"""Render a signal strength bar."""
filled = int(pct / 100 * width)
filled = max(0, min(filled, width))
bar = '#' * filled + '-' * (width - filled)
return f"[{bar}] {pct:.1f}%"
def format_config_bits(status: int) -> list:
"""Return list of (bit_name, is_set) tuples for config byte."""
result = []
for bit, (name, _field) in CONFIG_BITS.items():
result.append((name, bool(status & bit)))
return result
# -- Subcommand handlers --