skywalker-1/tools/tune.py
Ryan Malloy a2845c37fb Add SkyWalker-1 tuning tool and consolidated hardware reference
Python tool (tools/tune.py) implements all vendor USB control
commands for tuning, LNB control, DiSEqC switching, and MPEG-2
transport stream capture via pyusb. Includes CLI subcommands for
status, tune, stream, diseqc, and lnb operations.

Consolidated hardware reference merges all Phase 1 analysis into
a single 12-section document covering the complete USB interface,
all 30 vendor commands, BCM4500 demodulator protocol, GPIF
streaming path, DiSEqC timing, and cross-version firmware
comparison.
2026-02-11 12:30:05 -07:00

835 lines
29 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Genpix SkyWalker-1 DVB-S tuning and streaming tool.
Controls the SkyWalker-1 USB DVB-S satellite receiver via vendor USB
control transfers. Supports tuning, LNB control, DiSEqC switching,
MPEG-2 transport stream capture, and signal monitoring.
Hardware: Cypress FX2 (CY7C68013A) + Broadcom BCM4500 demodulator
USB: VID 0x09C0, PID 0x0203, EP2 bulk IN for TS data
"""
import sys
import struct
import argparse
import time
import json
import signal
import os
try:
import usb.core
import usb.util
except ImportError:
print("pyusb required: pip install pyusb")
sys.exit(1)
VENDOR_ID = 0x09C0
PRODUCT_ID = 0x0203
# Streaming endpoint
EP2_ADDR = 0x82
EP2_URB_SIZE = 8192
# Vendor commands
CMD_GET_8PSK_CONFIG = 0x80
CMD_I2C_WRITE = 0x83
CMD_I2C_READ = 0x84
CMD_ARM_TRANSFER = 0x85
CMD_TUNE_8PSK = 0x86
CMD_GET_SIGNAL_STRENGTH = 0x87
CMD_LOAD_BCM4500 = 0x88
CMD_BOOT_8PSK = 0x89
CMD_START_INTERSIL = 0x8A
CMD_SET_LNB_VOLTAGE = 0x8B
CMD_SET_22KHZ_TONE = 0x8C
CMD_SEND_DISEQC = 0x8D
CMD_GET_SIGNAL_LOCK = 0x90
CMD_GET_FW_VERS = 0x92
CMD_GET_SERIAL_NUMBER = 0x93
CMD_USE_EXTRA_VOLT = 0x94
# Config status bits (GET_8PSK_CONFIG response)
CONFIG_BITS = {
0x01: ("8PSK Started", "bm8pskStarted"),
0x02: ("BCM4500 FW Loaded", "bm8pskFW_Loaded"),
0x04: ("LNB Power On", "bmIntersilOn"),
0x08: ("DVB Mode", "bmDVBmode"),
0x10: ("22 kHz Tone", "bm22kHz"),
0x20: ("18V Selected", "bmSEL18V"),
0x40: ("DC Tuned", "bmDCtuned"),
0x80: ("Armed (streaming)", "bmArmed"),
}
# Modulation types for TUNE_8PSK byte 8
MODULATIONS = {
"qpsk": (0, "DVB-S QPSK"),
"turbo-qpsk": (1, "Turbo QPSK"),
"turbo-8psk": (2, "Turbo 8PSK"),
"turbo-16qam": (3, "Turbo 16QAM"),
"dcii-combo": (4, "DCII Combo"),
"dcii-i": (5, "DCII I-stream"),
"dcii-q": (6, "DCII Q-stream"),
"dcii-oqpsk": (7, "DCII Offset QPSK"),
"dss": (8, "DSS QPSK"),
"bpsk": (9, "DVB BPSK"),
}
# FEC rate indices per modulation group
FEC_RATES = {
"dvbs": {
"1/2": 0, "2/3": 1, "3/4": 2, "5/6": 3,
"7/8": 4, "auto": 5, "none": 6,
},
"turbo": {
"1/2": 0, "2/3": 1, "3/4": 2, "5/6": 3, "auto": 4,
},
"turbo-16qam": {
"3/4": 0, "auto": 0,
},
"dcii": {
"1/2": 0, "2/3": 1, "6/7": 2, "3/4": 3, "5/11": 4,
"1/2+": 5, "2/3+": 6, "6/7+": 7, "3/4+": 8, "auto": 0,
},
}
# Map modulation names to FEC group
MOD_FEC_GROUP = {
"qpsk": "dvbs",
"turbo-qpsk": "turbo",
"turbo-8psk": "turbo",
"turbo-16qam": "turbo-16qam",
"dcii-combo": "dcii",
"dcii-i": "dcii",
"dcii-q": "dcii",
"dcii-oqpsk": "dcii",
"dss": "dvbs",
"bpsk": "dvbs",
}
# Default LNB LO frequencies (MHz)
LNB_LO_LOW = 9750 # Universal LNB low-band
LNB_LO_HIGH = 10600 # Universal LNB high-band
class SkyWalker1:
"""USB interface to the Genpix SkyWalker-1 DVB-S receiver."""
def __init__(self, verbose: bool = False):
self.dev = None
self.detached_intf = None
self.verbose = verbose
def open(self) -> None:
"""Find and claim the SkyWalker-1 USB device."""
self.dev = usb.core.find(idVendor=VENDOR_ID, idProduct=PRODUCT_ID)
if self.dev is None:
print("SkyWalker-1 not found (VID 0x09C0, PID 0x0203). Is it plugged in?")
sys.exit(1)
# Detach kernel driver if bound
for cfg in self.dev:
for intf in cfg:
if self.dev.is_kernel_driver_active(intf.bInterfaceNumber):
try:
self.dev.detach_kernel_driver(intf.bInterfaceNumber)
self.detached_intf = intf.bInterfaceNumber
if self.verbose:
print(f" Detached kernel driver from interface {intf.bInterfaceNumber}")
except usb.core.USBError as e:
print(f"Cannot detach kernel driver: {e}")
print("The gp8psk module must be unbound first. Try one of:")
print(" sudo modprobe -r dvb_usb_gp8psk")
print(" echo '<bus-path>' | sudo tee /sys/bus/usb/drivers/gp8psk/unbind")
sys.exit(1)
try:
self.dev.set_configuration()
except usb.core.USBError:
pass # May already be configured
def close(self) -> None:
"""Release device and re-attach kernel driver."""
if self.dev is None:
return
if self.detached_intf is not None:
try:
usb.util.release_interface(self.dev, self.detached_intf)
self.dev.attach_kernel_driver(self.detached_intf)
if self.verbose:
print("Re-attached kernel driver")
except usb.core.USBError:
print("Note: run 'sudo modprobe dvb_usb_gp8psk' to reload driver")
def __enter__(self):
self.open()
return self
def __exit__(self, *exc):
self.close()
# -- Low-level USB transfers --
def _vendor_in(self, request: int, value: int = 0, index: int = 0,
length: int = 64, retries: int = 3) -> bytes:
"""Vendor IN control transfer (device-to-host), with retry."""
for attempt in range(retries):
try:
data = self.dev.ctrl_transfer(
usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_IN,
request, value, index, length, 2000
)
if self.verbose:
raw = bytes(data).hex(' ')
print(f" USB IN req=0x{request:02X} val=0x{value:04X} "
f"idx=0x{index:04X} -> [{len(data)}] {raw}")
if len(data) == length:
return bytes(data)
# Partial read, retry
if self.verbose:
print(f" Partial read ({len(data)}/{length}), retry {attempt + 1}")
continue
except usb.core.USBError as e:
if self.verbose:
print(f" USB IN req=0x{request:02X} FAILED: {e}")
if attempt == retries - 1:
raise
return bytes(data)
def _vendor_out(self, request: int, value: int = 0, index: int = 0,
data: bytes = b'') -> int:
"""Vendor OUT control transfer (host-to-device)."""
if self.verbose:
raw = data.hex(' ') if data else "(no data)"
print(f" USB OUT req=0x{request:02X} val=0x{value:04X} "
f"idx=0x{index:04X} data=[{len(data)}] {raw}")
return self.dev.ctrl_transfer(
usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_OUT,
request, value, index, data, 2000
)
# -- Device info commands --
def get_config(self) -> int:
"""Read 8PSK config status byte (GET_8PSK_CONFIG 0x80)."""
data = self._vendor_in(CMD_GET_8PSK_CONFIG, length=1)
return data[0]
def get_fw_version(self) -> dict:
"""Read firmware version (GET_FW_VERS 0x92). Returns dict."""
data = self._vendor_in(CMD_GET_FW_VERS, length=6)
return {
"major": data[2],
"minor": data[1],
"patch": data[0],
"version": f"{data[2]}.{data[1]:02d}.{data[0]}",
"date": f"20{data[5]:02d}-{data[4]:02d}-{data[3]:02d}",
}
def get_signal_lock(self) -> bool:
"""Read signal lock status (GET_SIGNAL_LOCK 0x90)."""
data = self._vendor_in(CMD_GET_SIGNAL_LOCK, length=1)
return data[0] != 0
def get_signal_strength(self) -> dict:
"""Read signal strength (GET_SIGNAL_STRENGTH 0x87). Returns SNR info."""
data = self._vendor_in(CMD_GET_SIGNAL_STRENGTH, length=6)
snr_raw = struct.unpack_from('<H', data, 0)[0]
# SNR is in dBu * 256 units. Scale: snr * 17 maps to 0-65535.
snr_scaled = min(snr_raw * 17, 65535)
snr_pct = (snr_scaled / 65535) * 100
snr_db = snr_raw / 256.0
return {
"snr_raw": snr_raw,
"snr_db": snr_db,
"snr_pct": snr_pct,
"raw_bytes": bytes(data).hex(' '),
}
# -- Power and boot commands --
def boot(self, on: bool = True) -> int:
"""Power on/off the 8PSK demodulator (BOOT_8PSK 0x89)."""
data = self._vendor_in(CMD_BOOT_8PSK, value=int(on), length=1)
return data[0]
def start_intersil(self, on: bool = True) -> int:
"""Enable/disable LNB power supply (START_INTERSIL 0x8A)."""
data = self._vendor_in(CMD_START_INTERSIL, value=int(on), length=1)
return data[0]
def set_lnb_voltage(self, high: bool) -> None:
"""Set LNB voltage: high=True for 18V (H/L), high=False for 13V (V/R)."""
self._vendor_out(CMD_SET_LNB_VOLTAGE, value=int(high))
def set_22khz_tone(self, on: bool) -> None:
"""Enable/disable 22 kHz tone (SET_22KHZ_TONE 0x8C)."""
self._vendor_out(CMD_SET_22KHZ_TONE, value=int(on))
def set_extra_voltage(self, on: bool) -> None:
"""Enable +1V LNB boost: 13->14V, 18->19V (USE_EXTRA_VOLT 0x94)."""
self._vendor_out(CMD_USE_EXTRA_VOLT, value=int(on))
# -- Tuning --
def tune(self, symbol_rate_sps: int, freq_khz: int,
mod_index: int, fec_index: int) -> None:
"""Send TUNE_8PSK (0x86) with 10-byte payload."""
payload = struct.pack('<II', symbol_rate_sps, freq_khz)
payload += bytes([mod_index, fec_index])
self._vendor_out(CMD_TUNE_8PSK, data=payload)
# -- Streaming --
def arm_transfer(self, on: bool) -> None:
"""Start/stop MPEG-2 transport stream (ARM_TRANSFER 0x85)."""
self._vendor_out(CMD_ARM_TRANSFER, value=int(on))
def read_stream(self, size: int = EP2_URB_SIZE,
timeout: int = 1000) -> bytes:
"""Read a chunk from the TS bulk endpoint (EP2 0x82)."""
try:
data = self.dev.read(EP2_ADDR, size, timeout)
return bytes(data)
except usb.core.USBTimeoutError:
return b''
except usb.core.USBError as e:
if self.verbose:
print(f" EP2 read error: {e}")
return b''
# -- DiSEqC --
def send_diseqc_tone_burst(self, mini_cmd: int) -> None:
"""Send tone burst (mini-DiSEqC). 0=SEC_MINI_A, 1=SEC_MINI_B."""
self._vendor_out(CMD_SEND_DISEQC, value=mini_cmd)
def send_diseqc_message(self, msg: bytes) -> None:
"""Send full DiSEqC message (3-6 bytes). wValue = framing byte."""
if len(msg) < 3 or len(msg) > 6:
raise ValueError(f"DiSEqC message must be 3-6 bytes, got {len(msg)}")
self._vendor_out(CMD_SEND_DISEQC, value=msg[0], data=msg)
# -- Signal bar rendering --
def signal_bar(pct: float, width: int = 40) -> str:
"""Render a signal strength bar."""
filled = int(pct / 100 * width)
filled = max(0, min(filled, width))
bar = '#' * filled + '-' * (width - filled)
return f"[{bar}] {pct:.1f}%"
def format_config_bits(status: int) -> list:
"""Return list of (bit_name, is_set) tuples for config byte."""
result = []
for bit, (name, _field) in CONFIG_BITS.items():
result.append((name, bool(status & bit)))
return result
# -- Subcommand handlers --
def cmd_status(sw: SkyWalker1, args: argparse.Namespace) -> None:
"""Show device status, firmware version, signal info."""
print(f"Genpix SkyWalker-1 Status")
print(f"{'=' * 50}")
print(f"\nUSB: Bus {sw.dev.bus} Addr {sw.dev.address} "
f"(VID 0x{VENDOR_ID:04X}, PID 0x{PRODUCT_ID:04X})")
# Firmware version
try:
fw = sw.get_fw_version()
print(f"FW: {fw['version']} (built {fw['date']})")
except usb.core.USBError:
print("FW: (read failed)")
fw = None
# Config status
status = sw.get_config()
print(f"\nConfig: 0x{status:02X}")
bits = format_config_bits(status)
for name, is_set in bits:
state = "ON" if is_set else "off"
print(f" [{state:>3}] {name}")
# Signal lock and strength
locked = sw.get_signal_lock()
print(f"\nSignal Lock: {'LOCKED' if locked else 'no lock'}")
if locked:
sig = sw.get_signal_strength()
print(f"SNR: {sig['snr_db']:.1f} dB (raw 0x{sig['snr_raw']:04X})")
print(f"Quality: {signal_bar(sig['snr_pct'])}")
if args.json:
out = {
"usb": {"bus": sw.dev.bus, "address": sw.dev.address},
"config": status,
"config_bits": {field: bool(status & bit)
for bit, (_name, field) in CONFIG_BITS.items()},
"locked": locked,
}
if fw:
out["firmware"] = fw
if locked:
out["signal"] = sw.get_signal_strength()
print(f"\n{json.dumps(out, indent=2)}")
def cmd_tune(sw: SkyWalker1, args: argparse.Namespace) -> None:
"""Tune to a transponder."""
freq_mhz = args.freq
sr_ksps = args.sr
mod_name = args.mod
fec_name = args.fec
pol = args.pol.upper() if args.pol else None
band = args.band
# Resolve LNB LO
if args.lnb_lo:
lnb_lo = args.lnb_lo
elif band == "high":
lnb_lo = LNB_LO_HIGH
else:
lnb_lo = LNB_LO_LOW
# Compute IF frequency
if_mhz = freq_mhz - lnb_lo
if_khz = int(if_mhz * 1000)
if if_khz < 950000 or if_khz > 2150000:
print(f"WARNING: IF frequency {if_mhz} MHz is outside 950-2150 MHz range")
print(f" Downlink: {freq_mhz} MHz, LNB LO: {lnb_lo} MHz")
if if_khz < 0:
print(" IF is negative -- check your LNB LO frequency")
sys.exit(1)
# Resolve modulation
if mod_name not in MODULATIONS:
print(f"Unknown modulation: {mod_name}")
print(f"Valid: {', '.join(MODULATIONS.keys())}")
sys.exit(1)
mod_index, mod_desc = MODULATIONS[mod_name]
# Resolve FEC
fec_group = MOD_FEC_GROUP[mod_name]
fec_table = FEC_RATES[fec_group]
if fec_name not in fec_table:
print(f"Invalid FEC '{fec_name}' for {mod_desc}")
print(f"Valid: {', '.join(fec_table.keys())}")
sys.exit(1)
fec_index = fec_table[fec_name]
sr_sps = sr_ksps * 1000
print(f"Tuning SkyWalker-1")
print(f"{'=' * 50}")
print(f" Downlink: {freq_mhz} MHz")
print(f" LNB LO: {lnb_lo} MHz")
print(f" IF Frequency: {if_mhz} MHz ({if_khz} kHz)")
print(f" Symbol Rate: {sr_ksps} ksps ({sr_sps} sps)")
print(f" Modulation: {mod_desc} (index {mod_index})")
print(f" FEC: {fec_name} (index {fec_index})")
if pol:
pol_desc = {"H": "Horizontal (18V)", "V": "Vertical (13V)",
"L": "Left circular (18V)", "R": "Right circular (13V)"}
print(f" Polarization: {pol_desc.get(pol, pol)}")
if band:
print(f" Band: {band} ({'22kHz on' if band == 'high' else '22kHz off'})")
print()
# Step 1: Check device status
status = sw.get_config()
print(f"[1/8] Config status: 0x{status:02X}")
# Step 2: Boot demodulator if needed
if not (status & 0x01):
print("[2/8] Booting 8PSK demodulator...")
sw.boot(on=True)
time.sleep(0.5)
status = sw.get_config()
if not (status & 0x01):
print(" FAILED: Device did not start")
sys.exit(1)
print(" OK")
else:
print("[2/8] Demodulator already running")
# Step 3: Enable LNB power if needed
if not (status & 0x04):
print("[3/8] Enabling LNB power supply...")
sw.start_intersil(on=True)
time.sleep(0.3)
status = sw.get_config()
if not (status & 0x04):
print(" FAILED: LNB power did not enable")
sys.exit(1)
print(" OK")
else:
print("[3/8] LNB power already on")
# Step 4: Set LNB voltage (polarization)
if pol:
high_voltage = pol in ("H", "L")
print(f"[4/8] Setting LNB voltage: {'18V' if high_voltage else '13V'}")
sw.set_lnb_voltage(high_voltage)
else:
print("[4/8] LNB voltage: not changed (no --pol specified)")
# Step 5: Extra voltage if requested
if args.extra_volt:
print("[5/8] Enabling +1V LNB boost")
sw.set_extra_voltage(True)
else:
print("[5/8] Extra voltage: off")
# Step 6: Set 22 kHz tone (band selection)
if band:
tone_on = (band == "high")
print(f"[6/8] 22 kHz tone: {'ON' if tone_on else 'OFF'}")
sw.set_22khz_tone(tone_on)
else:
print("[6/8] 22 kHz tone: not changed (no --band specified)")
# Step 7: Send tune command
print(f"[7/8] Sending TUNE_8PSK...")
if sw.verbose:
payload_hex = struct.pack('<II', sr_sps, if_khz).hex(' ')
print(f" Payload: {payload_hex} {mod_index:02x} {fec_index:02x}")
sw.tune(sr_sps, if_khz, mod_index, fec_index)
# Step 8: Wait for lock
timeout = args.timeout
print(f"[8/8] Waiting for signal lock (timeout {timeout}s)...")
deadline = time.time() + timeout
locked = False
dots = 0
while time.time() < deadline:
if sw.get_signal_lock():
locked = True
break
print(".", end="", flush=True)
dots += 1
time.sleep(0.5)
if dots:
print()
if locked:
sig = sw.get_signal_strength()
print(f"\n LOCKED")
print(f" SNR: {sig['snr_db']:.1f} dB (raw 0x{sig['snr_raw']:04X})")
print(f" Quality: {signal_bar(sig['snr_pct'])}")
else:
print(f"\n NO LOCK after {timeout}s")
print(" Check frequency, symbol rate, polarization, and dish alignment")
if args.json:
out = {
"tuned": True,
"locked": locked,
"freq_mhz": freq_mhz,
"if_khz": if_khz,
"sr_ksps": sr_ksps,
"modulation": mod_name,
"fec": fec_name,
}
if locked:
out["signal"] = sw.get_signal_strength()
print(f"\n{json.dumps(out, indent=2)}")
def cmd_stream(sw: SkyWalker1, args: argparse.Namespace) -> None:
"""Stream MPEG-2 transport data to file or stdout."""
# Verify signal lock
if not sw.get_signal_lock():
print("No signal lock -- tune to a transponder first")
print(" Example: tune.py tune 12520 27500 --pol H --band high")
sys.exit(1)
output_file = None
output_fd = None
if args.stdout:
output_fd = sys.stdout.buffer
# Suppress all status output when piping
status_fd = sys.stderr
elif args.output:
output_file = args.output
output_fd = open(output_file, 'wb')
status_fd = sys.stdout
else:
print("Specify -o FILE or --stdout")
sys.exit(1)
duration = args.duration
total_bytes = 0
start_time = time.time()
last_report = start_time
running = True
def stop_handler(signum, frame):
nonlocal running
running = False
signal.signal(signal.SIGINT, stop_handler)
signal.signal(signal.SIGTERM, stop_handler)
status_fd.write(f"Streaming TS data")
if output_file:
status_fd.write(f" to {output_file}")
if duration:
status_fd.write(f" for {duration}s")
status_fd.write("\n")
status_fd.flush()
# Arm the transfer
sw.arm_transfer(on=True)
status_fd.write(" Armed. Reading EP2...\n")
status_fd.flush()
try:
while running:
if duration and (time.time() - start_time) >= duration:
break
chunk = sw.read_stream(EP2_URB_SIZE, timeout=2000)
if chunk:
output_fd.write(chunk)
total_bytes += len(chunk)
now = time.time()
if now - last_report >= 1.0:
elapsed = now - start_time
bitrate = (total_bytes * 8) / elapsed if elapsed > 0 else 0
if bitrate >= 1e6:
rate_str = f"{bitrate / 1e6:.2f} Mbps"
else:
rate_str = f"{bitrate / 1e3:.1f} kbps"
status_fd.write(f"\r {total_bytes:,} bytes {rate_str} "
f"({elapsed:.0f}s) ")
status_fd.flush()
last_report = now
finally:
sw.arm_transfer(on=False)
status_fd.write(f"\n Stopped. Total: {total_bytes:,} bytes\n")
if output_file and output_fd:
output_fd.close()
status_fd.write(f" Saved to: {output_file}\n")
def cmd_diseqc(sw: SkyWalker1, args: argparse.Namespace) -> None:
"""Send DiSEqC commands."""
if args.tone_burst is not None:
burst_val = {"A": 0, "a": 0, "B": 1, "b": 1}.get(args.tone_burst)
if burst_val is None:
print("Tone burst must be A or B")
sys.exit(1)
print(f"Sending tone burst: SEC_MINI_{args.tone_burst.upper()} (0x{burst_val:02X})")
sw.send_diseqc_tone_burst(burst_val)
print(" OK")
elif args.port is not None:
port = args.port
if port < 1 or port > 4:
print("DiSEqC 1.0 port must be 1-4")
sys.exit(1)
# DiSEqC 1.0 committed switch command:
# Framing=0xE0 (command from master, no reply, first tx)
# Address=0x10 (any switch)
# Command=0x38 (Write N0 - committed switches)
# Data=0xF0 | ((port-1) << 2) with option/position bits
# Bits: [7:4]=0xF (always), [3]=pol, [2]=band, [1:0]=port
# For simplicity, just switch port without changing pol/band bits
data_byte = 0xF0 | ((port - 1) << 2)
msg = bytes([0xE0, 0x10, 0x38, data_byte])
print(f"Sending DiSEqC 1.0: port {port}")
print(f" Message: {msg.hex(' ')}")
sw.send_diseqc_message(msg)
print(" OK")
elif args.raw:
raw_bytes = bytes(int(b, 16) for b in args.raw)
if len(raw_bytes) < 3 or len(raw_bytes) > 6:
print("Raw DiSEqC message must be 3-6 bytes")
sys.exit(1)
print(f"Sending raw DiSEqC: {raw_bytes.hex(' ')}")
sw.send_diseqc_message(raw_bytes)
print(" OK")
else:
print("Specify --port, --tone-burst, or --raw")
sys.exit(1)
def cmd_lnb(sw: SkyWalker1, args: argparse.Namespace) -> None:
"""Control LNB voltage and 22 kHz tone."""
did_something = False
# Ensure LNB power is on first
status = sw.get_config()
if not (status & 0x04):
print("Enabling LNB power supply...")
sw.start_intersil(on=True)
time.sleep(0.3)
if args.voltage is not None:
v = args.voltage
if v not in (13, 18):
print("Voltage must be 13 or 18")
sys.exit(1)
high = (v == 18)
print(f"Setting LNB voltage: {v}V")
sw.set_lnb_voltage(high)
did_something = True
if args.extra_volt:
print("Enabling +1V LNB boost")
sw.set_extra_voltage(True)
did_something = True
if args.tone is not None:
tone_on = args.tone.lower() in ("on", "1", "true", "yes")
print(f"22 kHz tone: {'ON' if tone_on else 'OFF'}")
sw.set_22khz_tone(tone_on)
did_something = True
if args.power is not None:
power_on = args.power.lower() in ("on", "1", "true", "yes")
if power_on:
print("Enabling LNB power supply")
sw.start_intersil(on=True)
else:
print("Disabling LNB power supply")
sw.start_intersil(on=False)
did_something = True
if not did_something:
# Just show current LNB state
status = sw.get_config()
print(f"LNB Status:")
print(f" Power: {'ON' if status & 0x04 else 'off'}")
print(f" Voltage: {'18V' if status & 0x20 else '13V'}")
print(f" 22 kHz: {'ON' if status & 0x10 else 'off'}")
print(f" Armed: {'YES' if status & 0x80 else 'no'}")
else:
# Read back config to confirm
time.sleep(0.1)
status = sw.get_config()
print(f"\nConfig: 0x{status:02X}")
print(f" Power: {'ON' if status & 0x04 else 'off'} "
f"Voltage: {'18V' if status & 0x20 else '13V'} "
f"22kHz: {'ON' if status & 0x10 else 'off'}")
# -- CLI --
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Genpix SkyWalker-1 DVB-S tuning and streaming tool",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""\
examples:
%(prog)s status
%(prog)s tune 12520 27500 --pol H --band high
%(prog)s tune 12520 27500 --pol H --band high --mod qpsk --fec auto
%(prog)s stream -o capture.ts --duration 60
%(prog)s stream --stdout | vlc -
%(prog)s diseqc --port 1
%(prog)s diseqc --tone-burst A
%(prog)s diseqc --raw E0 10 38 F0
%(prog)s lnb --voltage 18 --tone on
""")
parser.add_argument('-v', '--verbose', action='store_true',
help="Show raw USB traffic")
parser.add_argument('--json', action='store_true',
help="Output machine-readable JSON where supported")
sub = parser.add_subparsers(dest='command')
# status
sub.add_parser('status', help="Show device config, FW version, signal status")
# tune
p_tune = sub.add_parser('tune', help="Tune to a transponder")
p_tune.add_argument('freq', type=float,
help="Transponder downlink frequency in MHz (e.g. 12520)")
p_tune.add_argument('sr', type=int,
help="Symbol rate in ksps (e.g. 27500)")
p_tune.add_argument('--pol', choices=['H', 'V', 'L', 'R', 'h', 'v', 'l', 'r'],
help="Polarization: H/V (linear) or L/R (circular)")
p_tune.add_argument('--band', choices=['low', 'high'],
help="LNB band: low (tone off) or high (tone on)")
p_tune.add_argument('--lnb-lo', type=float, default=None,
help="LNB LO frequency in MHz (default: 9750 low, 10600 high)")
p_tune.add_argument('--mod', default='qpsk',
choices=list(MODULATIONS.keys()),
help="Modulation type (default: qpsk)")
p_tune.add_argument('--fec', default='auto',
help="FEC rate (default: auto). Options depend on modulation.")
p_tune.add_argument('--timeout', type=float, default=10,
help="Signal lock timeout in seconds (default: 10)")
p_tune.add_argument('--extra-volt', action='store_true',
help="Enable +1V LNB voltage boost for long cables")
# stream
p_stream = sub.add_parser('stream', help="Stream MPEG-2 TS data")
p_stream.add_argument('-o', '--output', help="Output file for TS data")
p_stream.add_argument('--stdout', action='store_true',
help="Write TS stream to stdout (pipe to vlc, ffmpeg, etc)")
p_stream.add_argument('--duration', type=float, default=None,
help="Capture duration in seconds (default: until CTRL-C)")
# diseqc
p_diseqc = sub.add_parser('diseqc', help="Send DiSEqC commands")
p_diseqc.add_argument('--port', type=int,
help="DiSEqC 1.0 switch port (1-4)")
p_diseqc.add_argument('--tone-burst', metavar='A|B',
help="Mini DiSEqC tone burst (A or B)")
p_diseqc.add_argument('--raw', nargs='+', metavar='HH',
help="Raw DiSEqC bytes in hex (e.g. E0 10 38 F0)")
# lnb
p_lnb = sub.add_parser('lnb', help="LNB voltage and tone control")
p_lnb.add_argument('--voltage', type=int, choices=[13, 18],
help="LNB voltage (13V or 18V)")
p_lnb.add_argument('--tone', help="22 kHz tone (on/off)")
p_lnb.add_argument('--extra-volt', action='store_true',
help="Enable +1V LNB voltage boost")
p_lnb.add_argument('--power', help="LNB power supply (on/off)")
return parser
def main():
parser = build_parser()
args = parser.parse_args()
if not args.command:
# Default to status if no subcommand
args.command = 'status'
args.json = getattr(args, 'json', False)
dispatch = {
'status': cmd_status,
'tune': cmd_tune,
'stream': cmd_stream,
'diseqc': cmd_diseqc,
'lnb': cmd_lnb,
}
handler = dispatch.get(args.command)
if handler is None:
parser.print_help()
sys.exit(1)
with SkyWalker1(verbose=args.verbose) as sw:
handler(sw, args)
if __name__ == '__main__':
main()