#!/usr/bin/env python3 """ Genpix SkyWalker-1 shared library. Provides the SkyWalker1 USB interface class, constants, and signal processing utilities used by skywalker.py and tune.py. """ import sys import struct import time import math try: import usb.core import usb.util except ImportError: print("pyusb required: pip install pyusb") sys.exit(1) # --- USB identifiers --- VENDOR_ID = 0x09C0 PRODUCT_ID = 0x0203 EP2_ADDR = 0x82 EP2_URB_SIZE = 8192 # --- Vendor commands --- CMD_GET_8PSK_CONFIG = 0x80 CMD_I2C_WRITE = 0x83 CMD_I2C_READ = 0x84 CMD_ARM_TRANSFER = 0x85 CMD_TUNE_8PSK = 0x86 CMD_GET_SIGNAL_STRENGTH = 0x87 CMD_LOAD_BCM4500 = 0x88 CMD_BOOT_8PSK = 0x89 CMD_START_INTERSIL = 0x8A CMD_SET_LNB_VOLTAGE = 0x8B CMD_SET_22KHZ_TONE = 0x8C CMD_SEND_DISEQC = 0x8D CMD_GET_SIGNAL_LOCK = 0x90 CMD_GET_FW_VERS = 0x92 CMD_GET_SERIAL_NUMBER = 0x93 CMD_USE_EXTRA_VOLT = 0x94 # Custom commands (v3.01+) CMD_SPECTRUM_SWEEP = 0xB0 CMD_RAW_DEMOD_READ = 0xB1 CMD_RAW_DEMOD_WRITE = 0xB2 CMD_BLIND_SCAN = 0xB3 CMD_I2C_BUS_SCAN = 0xB4 CMD_I2C_RAW_READ = 0xB5 CMD_I2C_DIAG = 0xB6 # Custom commands (v3.02+) CMD_SIGNAL_MONITOR = 0xB7 CMD_TUNE_MONITOR = 0xB8 CMD_MULTI_REG_READ = 0xB9 # Custom commands (v3.03+) CMD_PARAM_SWEEP = 0xBA CMD_ADAPTIVE_BLIND_SCAN = 0xBB CMD_GET_LAST_ERROR = 0xBC CMD_GET_STREAM_DIAG = 0xBD CMD_GET_HOTPLUG_STATUS = 0xBE # Error codes (returned by CMD_GET_LAST_ERROR) ERR_OK = 0x00 ERR_I2C_TIMEOUT = 0x01 ERR_I2C_NAK = 0x02 ERR_I2C_ARB_LOST = 0x03 ERR_BCM_NOT_READY = 0x04 ERR_BCM_TIMEOUT = 0x05 ERROR_NAMES = { ERR_OK: "OK", ERR_I2C_TIMEOUT: "I2C timeout", ERR_I2C_NAK: "I2C NAK (no ACK from slave)", ERR_I2C_ARB_LOST: "I2C arbitration lost", ERR_BCM_NOT_READY: "BCM4500 not ready", ERR_BCM_TIMEOUT: "BCM4500 command timeout", } # --- Config status bits --- CONFIG_BITS = { 0x01: ("8PSK Started", "bm8pskStarted"), 0x02: ("BCM4500 FW Loaded", "bm8pskFW_Loaded"), 0x04: ("LNB Power On", "bmIntersilOn"), 0x08: ("DVB Mode", "bmDVBmode"), 0x10: ("22 kHz Tone", "bm22kHz"), 0x20: ("18V Selected", "bmSEL18V"), 0x40: ("DC Tuned", "bmDCtuned"), 0x80: ("Armed (streaming)", "bmArmed"), } # --- Modulation and FEC tables --- MODULATIONS = { "qpsk": (0, "DVB-S QPSK"), "turbo-qpsk": (1, "Turbo QPSK"), "turbo-8psk": (2, "Turbo 8PSK"), "turbo-16qam": (3, "Turbo 16QAM"), "dcii-combo": (4, "DCII Combo"), "dcii-i": (5, "DCII I-stream"), "dcii-q": (6, "DCII Q-stream"), "dcii-oqpsk": (7, "DCII Offset QPSK"), "dss": (8, "DSS QPSK"), "bpsk": (9, "DVB BPSK"), } FEC_RATES = { "dvbs": { "1/2": 0, "2/3": 1, "3/4": 2, "5/6": 3, "7/8": 4, "auto": 5, "none": 6, }, "turbo": { "1/2": 0, "2/3": 1, "3/4": 2, "5/6": 3, "auto": 4, }, "turbo-16qam": { "3/4": 0, "auto": 0, }, "dcii": { "1/2": 0, "2/3": 1, "6/7": 2, "3/4": 3, "5/11": 4, "1/2+": 5, "2/3+": 6, "6/7+": 7, "3/4+": 8, "auto": 0, }, } MOD_FEC_GROUP = { "qpsk": "dvbs", "turbo-qpsk": "turbo", "turbo-8psk": "turbo", "turbo-16qam": "turbo-16qam", "dcii-combo": "dcii", "dcii-i": "dcii", "dcii-q": "dcii", "dcii-oqpsk": "dcii", "dss": "dvbs", "bpsk": "dvbs", } # --- LNB defaults --- LNB_LO_LOW = 9750 # Universal LNB low-band (MHz) LNB_LO_HIGH = 10600 # Universal LNB high-band (MHz) # --- L-band allocations (for annotation) --- LBAND_ALLOCATIONS = [ (1240, 1300, "Amateur 23cm"), (1525, 1559, "Inmarsat downlink"), (1559, 1610, "GNSS (GPS L1, Galileo E1)"), (1610, 1626, "Iridium downlink"), (1670, 1710, "MetSat (GOES LRIT, NOAA HRPT)"), (1710, 1785, "LTE/AWS uplink"), (1920, 2025, "UMTS uplink"), ] # --- Signal processing helpers --- def snr_raw_to_db(snr_raw: int) -> float: """Convert BCM4500 SNR register to dB. Register is dBu * 256.""" return snr_raw / 256.0 def snr_raw_to_pct(snr_raw: int) -> float: """Convert raw SNR to percentage (0-100 scale, clamped).""" scaled = min(snr_raw * 17, 65535) return (scaled / 65535) * 100 def agc_to_power_db(agc1: int, agc2: int) -> float: """ Estimate received power from AGC register values. The AGC loop adjusts gain to keep the signal level constant at the ADC input. Higher AGC = weaker signal (more gain needed). This is an approximation; the exact mapping depends on the BCM3440 tuner's gain curve. Returns a relative dB value (higher = stronger signal). """ # AGC1 is the primary gain control, AGC2 is fine adjustment. # Invert: low AGC value = high signal = high power. # Scale to approximate dB with ~40 dB dynamic range. combined = agc1 + (agc2 >> 4) if combined == 0: return 0.0 # Rough linear-to-dB: 65535 AGC ≈ -40 dB, 0 AGC ≈ 0 dB return -40.0 * (combined / 65535.0) def detect_peaks(freqs: list, powers: list, threshold_db: float = 3.0) -> list: """ Find peaks in a spectrum sweep. Returns list of (freq_mhz, power_db, index) tuples for each local maximum that exceeds the noise floor by threshold_db. """ if len(powers) < 3: return [] # Estimate noise floor as the 25th percentile sorted_p = sorted(powers) noise_floor = sorted_p[len(sorted_p) // 4] peaks = [] for i in range(1, len(powers) - 1): if powers[i] > powers[i - 1] and powers[i] > powers[i + 1]: if powers[i] - noise_floor >= threshold_db: peaks.append((freqs[i], powers[i], i)) return peaks def if_to_rf(if_mhz: float, lnb_lo: float) -> float: """Convert IF frequency to actual RF frequency given LNB LO.""" return if_mhz + lnb_lo def rf_to_if(rf_mhz: float, lnb_lo: float) -> float: """Convert actual RF frequency to IF frequency given LNB LO.""" return rf_mhz - lnb_lo def signal_bar(pct: float, width: int = 40) -> str: """Render an ASCII signal strength bar.""" filled = int(pct / 100 * width) filled = max(0, min(filled, width)) bar = '#' * filled + '-' * (width - filled) return f"[{bar}] {pct:.1f}%" def format_config_bits(status: int) -> list: """Return list of (bit_name, is_set) tuples for config byte.""" result = [] for bit, (name, _field) in CONFIG_BITS.items(): result.append((name, bool(status & bit))) return result # --- SkyWalker1 USB interface --- class SkyWalker1: """USB interface to the Genpix SkyWalker-1 DVB-S receiver.""" def __init__(self, verbose: bool = False): self.dev = None self.detached_intf = None self.verbose = verbose def open(self) -> None: """Find and claim the SkyWalker-1 USB device.""" self.dev = usb.core.find(idVendor=VENDOR_ID, idProduct=PRODUCT_ID) if self.dev is None: print("SkyWalker-1 not found (VID 0x09C0, PID 0x0203). Is it plugged in?") sys.exit(1) for cfg in self.dev: for intf in cfg: if self.dev.is_kernel_driver_active(intf.bInterfaceNumber): try: self.dev.detach_kernel_driver(intf.bInterfaceNumber) self.detached_intf = intf.bInterfaceNumber if self.verbose: print(f" Detached kernel driver from interface {intf.bInterfaceNumber}") except usb.core.USBError as e: print(f"Cannot detach kernel driver: {e}") print("The gp8psk module must be unbound first. Try one of:") print(" sudo modprobe -r dvb_usb_gp8psk") print(" echo '' | sudo tee /sys/bus/usb/drivers/gp8psk/unbind") sys.exit(1) try: self.dev.set_configuration() except usb.core.USBError: pass def close(self) -> None: """Release device and re-attach kernel driver.""" if self.dev is None: return if self.detached_intf is not None: try: usb.util.release_interface(self.dev, self.detached_intf) self.dev.attach_kernel_driver(self.detached_intf) if self.verbose: print("Re-attached kernel driver") except usb.core.USBError: print("Note: run 'sudo modprobe dvb_usb_gp8psk' to reload driver") def __enter__(self): self.open() return self def __exit__(self, *exc): self.close() # -- Low-level USB transfers -- def _vendor_in(self, request: int, value: int = 0, index: int = 0, length: int = 64, retries: int = 3) -> bytes: """Vendor IN control transfer (device-to-host), with retry.""" for attempt in range(retries): try: data = self.dev.ctrl_transfer( usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_IN, request, value, index, length, 2000 ) if self.verbose: raw = bytes(data).hex(' ') print(f" USB IN req=0x{request:02X} val=0x{value:04X} " f"idx=0x{index:04X} -> [{len(data)}] {raw}") if len(data) == length: return bytes(data) if self.verbose: print(f" Partial read ({len(data)}/{length}), retry {attempt + 1}") continue except usb.core.USBError as e: if self.verbose: print(f" USB IN req=0x{request:02X} FAILED: {e}") if attempt == retries - 1: raise return bytes(data) def _vendor_out(self, request: int, value: int = 0, index: int = 0, data: bytes = b'') -> int: """Vendor OUT control transfer (host-to-device).""" if self.verbose: raw = data.hex(' ') if data else "(no data)" print(f" USB OUT req=0x{request:02X} val=0x{value:04X} " f"idx=0x{index:04X} data=[{len(data)}] {raw}") return self.dev.ctrl_transfer( usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_OUT, request, value, index, data, 2000 ) # -- Device info -- def get_config(self) -> int: """Read 8PSK config status byte.""" data = self._vendor_in(CMD_GET_8PSK_CONFIG, length=1) return data[0] def get_fw_version(self) -> dict: """Read firmware version. Returns dict with version string and date.""" data = self._vendor_in(CMD_GET_FW_VERS, length=6) return { "major": data[2], "minor": data[1], "patch": data[0], "version": f"{data[2]}.{data[1]:02d}.{data[0]}", "date": f"20{data[5]:02d}-{data[4]:02d}-{data[3]:02d}", } def get_signal_lock(self) -> bool: """Read signal lock status.""" data = self._vendor_in(CMD_GET_SIGNAL_LOCK, length=1) return data[0] != 0 def get_signal_strength(self) -> dict: """Read signal strength. Returns SNR info dict.""" data = self._vendor_in(CMD_GET_SIGNAL_STRENGTH, length=6) snr_raw = struct.unpack_from(' int: """Power on/off the 8PSK demodulator. Custom firmware returns 3 bytes: [config_status, boot_stage, debug]. Stock firmware returns 1 byte. Request 3 to handle both. """ data = self._vendor_in(CMD_BOOT_8PSK, value=int(on), length=3) return data[0] def start_intersil(self, on: bool = True) -> int: """Enable/disable LNB power supply.""" data = self._vendor_in(CMD_START_INTERSIL, value=int(on), length=1) return data[0] def set_lnb_voltage(self, high: bool) -> None: """Set LNB voltage: high=True for 18V, False for 13V.""" self._vendor_out(CMD_SET_LNB_VOLTAGE, value=int(high)) def set_22khz_tone(self, on: bool) -> None: """Enable/disable 22 kHz tone.""" self._vendor_out(CMD_SET_22KHZ_TONE, value=int(on)) def set_extra_voltage(self, on: bool) -> None: """Enable +1V LNB boost: 13->14V, 18->19V.""" self._vendor_out(CMD_USE_EXTRA_VOLT, value=int(on)) # -- Tuning -- def tune(self, symbol_rate_sps: int, freq_khz: int, mod_index: int, fec_index: int) -> None: """Send TUNE_8PSK with 10-byte payload.""" payload = struct.pack(' None: """Start/stop MPEG-2 transport stream.""" self._vendor_out(CMD_ARM_TRANSFER, value=int(on)) def read_stream(self, size: int = EP2_URB_SIZE, timeout: int = 1000) -> bytes: """Read a chunk from the TS bulk endpoint.""" try: data = self.dev.read(EP2_ADDR, size, timeout) return bytes(data) except usb.core.USBTimeoutError: return b'' except usb.core.USBError as e: if self.verbose: print(f" EP2 read error: {e}") return b'' # -- DiSEqC -- def send_diseqc_tone_burst(self, mini_cmd: int) -> None: """Send tone burst (mini-DiSEqC). 0=SEC_MINI_A, 1=SEC_MINI_B.""" self._vendor_out(CMD_SEND_DISEQC, value=mini_cmd) def send_diseqc_message(self, msg: bytes) -> None: """Send full DiSEqC message (3-6 bytes).""" if len(msg) < 3 or len(msg) > 6: raise ValueError(f"DiSEqC message must be 3-6 bytes, got {len(msg)}") self._vendor_out(CMD_SEND_DISEQC, value=msg[0], data=msg) # -- New commands (v3.02+) -- def signal_monitor(self) -> dict: """ Fast combined signal read (0xB7). Returns 8 bytes in one transfer: SNR(2) + AGC1(2) + AGC2(2) + lock(1) + status(1). """ data = self._vendor_in(CMD_SIGNAL_MONITOR, length=8) snr_raw = struct.unpack_from(' dict: """ Tune + dwell + signal read in one round-trip (0xB8). Sends tune parameters via OUT phase, firmware tunes + waits dwell_ms + reads signal. Then IN phase returns the result. """ dwell_ms = max(1, min(255, dwell_ms)) payload = struct.pack(' bytes: """ Batch read of contiguous BCM4500 indirect registers (0xB9). Returns count bytes, one per register. Up to 64 registers in a single USB transfer (vs. individual 0xB1 reads). """ count = max(1, min(64, count)) data = self._vendor_in(CMD_MULTI_REG_READ, value=start_reg, index=count, length=count) return bytes(data) # -- Device info (extended) -- def get_serial_number(self) -> bytes: """Read 8-byte serial number from device.""" return self._vendor_in(CMD_GET_SERIAL_NUMBER, length=8) def get_usb_speed(self) -> int: """Read USB connection speed. 0=unknown, 1=Full, 2=High.""" data = self._vendor_in(0x07, length=1) return data[0] def get_vendor_string(self) -> str: """Read vendor string descriptor from FX2.""" data = self._vendor_in(0x0C, length=64) return bytes(data).rstrip(b'\x00').decode('ascii', errors='replace') def get_product_string(self) -> str: """Read product string descriptor from FX2.""" data = self._vendor_in(0x0D, length=64) return bytes(data).rstrip(b'\x00').decode('ascii', errors='replace') # -- FX2 RAM access (standard Cypress A0 vendor request) -- def fx2_ram_read(self, addr: int, length: int) -> bytes: """Read FX2 internal RAM via A0 vendor request. Non-destructive.""" length = max(1, min(64, length)) data = self.dev.ctrl_transfer( usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_IN, 0xA0, addr, 0, length, 2000 ) if self.verbose: raw = bytes(data).hex(' ') print(f" RAM IN addr=0x{addr:04X} len={length} -> {raw}") return bytes(data) def fx2_ram_write(self, addr: int, data: bytes) -> int: """Write FX2 internal RAM via A0 vendor request. Reverts on power cycle.""" if self.verbose: raw = data.hex(' ') print(f" RAM OUT addr=0x{addr:04X} len={len(data)} data={raw}") return self.dev.ctrl_transfer( usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_OUT, 0xA0, addr, 0, data, 2000 ) def fx2_cpu_halt(self) -> None: """Halt FX2 CPU by writing 1 to CPUCS register (0xE600).""" self.fx2_ram_write(0xE600, b'\x01') def fx2_cpu_start(self) -> None: """Release FX2 CPU by writing 0 to CPUCS register (0xE600).""" self.fx2_ram_write(0xE600, b'\x00') # -- EEPROM access (via I2C proxy commands) -- EEPROM_SLAVE = 0x51 EEPROM_PAGE_SIZE = 16 EEPROM_WRITE_CYCLE_MS = 10 def eeprom_read(self, offset: int, length: int = 64) -> bytes: """Read from boot EEPROM at given offset via I2C.""" return self._vendor_in(CMD_I2C_READ, value=self.EEPROM_SLAVE, index=offset, length=length) def eeprom_write_page(self, offset: int, data: bytes) -> int: """Write a page (up to 16 bytes) to EEPROM. Caller handles alignment.""" return self._vendor_out(CMD_I2C_WRITE, value=self.EEPROM_SLAVE, index=offset, data=data) def eeprom_read_all(self, size: int = 16384) -> bytes: """Read entire EEPROM contents up to size bytes.""" chunk_size = 64 result = bytearray() for offset in range(0, size, chunk_size): remaining = min(chunk_size, size - offset) chunk = self.eeprom_read(offset, remaining) result.extend(chunk) return bytes(result) # -- Diagnostics -- def boot_debug(self, mode: int) -> dict: """ Run boot diagnostic with specified mode byte. Modes: 0x80=no-op, 0x81=GPIO init, 0x82=I2C probe, 0x83=BCM4500 reset, 0x84=FW load, 0x85=full boot. Returns 3-byte status: {stage, result, detail}. """ data = self._vendor_in(CMD_BOOT_8PSK, value=mode, length=3) return { "stage": data[0], "result": data[1], "detail": data[2], } def i2c_bus_scan(self) -> list[int]: """ Scan I2C bus for responding devices. Returns list of 7-bit slave addresses that ACK'd. The firmware returns a 16-byte bitmap (128 bits for addresses 0-127). """ data = self._vendor_in(CMD_I2C_BUS_SCAN, length=16) addresses = [] for byte_idx in range(16): for bit_idx in range(8): if data[byte_idx] & (1 << bit_idx): addresses.append(byte_idx * 8 + bit_idx) return addresses def i2c_raw_read(self, slave: int, reg: int) -> int: """Read a single register from an I2C device.""" data = self._vendor_in(CMD_I2C_RAW_READ, value=slave, index=reg, length=1) return data[0] # -- High-level sweep helpers -- def sweep_spectrum(self, start_mhz: float, stop_mhz: float, step_mhz: float = 5.0, dwell_ms: int = 10, sr_ksps: int = 20000, mod_index: int = 0, fec_index: int = 5, callback=None) -> tuple: """ Sweep a frequency range and return power measurements. Uses TUNE_MONITOR (0xB8) at each step for efficient measurement. Default tune params: QPSK, auto-FEC, 20 Msps. callback(freq_mhz, step_num, total_steps, result) is called per step if provided. Returns (freqs_mhz[], powers_db[], raw_results[]). """ sr_sps = sr_ksps * 1000 freqs = [] powers = [] results = [] freq = start_mhz steps = int((stop_mhz - start_mhz) / step_mhz) + 1 step_num = 0 while freq <= stop_mhz: freq_khz = int(freq * 1000) result = self.tune_monitor(sr_sps, freq_khz, mod_index, fec_index, dwell_ms) freqs.append(freq) powers.append(result["power_db"]) results.append(result) if callback: callback(freq, step_num, steps, result) step_num += 1 freq += step_mhz return freqs, powers, results def ensure_booted(self) -> None: """Boot demodulator and enable LNB power if not already running.""" status = self.get_config() if not (status & 0x01): self.boot(on=True) time.sleep(0.5) status = self.get_config() if not (status & 0x01): raise RuntimeError("Device failed to start") if not (status & 0x04): self.start_intersil(on=True) time.sleep(0.3) def configure_lnb(self, pol: str = None, band: str = None, lnb_lo: float = None, disable_lnb: bool = False) -> float: """ Configure LNB voltage, tone, and return the effective LO frequency. pol: 'H'/'V'/'L'/'R' or None (don't change) band: 'low'/'high' or None (don't change) lnb_lo: explicit LO freq in MHz, or None for auto disable_lnb: True to disable LNB power (for direct input) """ if disable_lnb: self.start_intersil(on=False) return 0.0 if pol: high_voltage = pol.upper() in ("H", "L") self.set_lnb_voltage(high_voltage) if band: self.set_22khz_tone(band == "high") if lnb_lo is not None: return lnb_lo elif band == "high": return LNB_LO_HIGH else: return LNB_LO_LOW # -- New commands (v3.03+) -- def get_last_error(self) -> int: """Read last firmware error code (0xBC).""" data = self._vendor_in(CMD_GET_LAST_ERROR, length=1) return data[0] def get_last_error_str(self) -> str: """Read last firmware error code as human-readable string.""" code = self.get_last_error() return ERROR_NAMES.get(code, f"Unknown (0x{code:02X})") def param_sweep(self, start_khz: int, stop_khz: int, step_khz: int, sr_sps: int, mod_index: int = 0, fec_index: int = 5) -> bytes: """ Parameterized spectrum sweep (0xBA). Returns raw EP2 bulk data containing u16 LE power values, one per frequency step. """ payload = struct.pack(' dict | None: """ Adaptive blind scan (0xBB) with AGC pre-check. Returns lock result dict or None if no lock found. """ payload = struct.pack(' None: """Stop motor movement immediately.""" self.send_diseqc_message(diseqc_halt()) def motor_drive_east(self, steps: int = 0) -> None: """Drive motor east. steps=0 for continuous, 1-127 for step count.""" self.send_diseqc_message(diseqc_drive_east(steps)) def motor_drive_west(self, steps: int = 0) -> None: """Drive motor west. steps=0 for continuous, 1-127 for step count.""" self.send_diseqc_message(diseqc_drive_west(steps)) def motor_store_position(self, slot: int) -> None: """Store current position in slot (0-255).""" self.send_diseqc_message(diseqc_store_position(slot)) def motor_goto_position(self, slot: int) -> None: """Go to stored position slot (0-255). Slot 0 = reference/zero.""" self.send_diseqc_message(diseqc_goto_position(slot)) def motor_goto_x(self, observer_lon: float, sat_lon: float) -> None: """USALS GotoX: calculate and drive to satellite position.""" self.send_diseqc_message(diseqc_goto_x(observer_lon, sat_lon)) def motor_set_limit(self, direction: str) -> None: """Set soft limit at current position. direction: 'east' or 'west'.""" self.send_diseqc_message(diseqc_set_limit(direction)) def motor_disable_limits(self) -> None: """Disable east/west soft limits.""" self.send_diseqc_message(diseqc_disable_limits()) # -- Streaming diagnostics (v3.04+) -- def get_stream_diag(self, reset: bool = False) -> dict: """Read streaming diagnostics counters (0xBD). Returns dict with poll_count, overflow_count, sync_loss, last_status, last_lock, armed, had_sync. Set reset=True to clear counters after read. """ wval = 1 if reset else 0 data = self._vendor_in(CMD_GET_STREAM_DIAG, value=wval, length=12) poll_count = struct.unpack_from(' dict: """Read I2C hot-plug detection status (0xBE). Returns dict with current/previous bus bitmaps, change count, devices added/removed in last scan, and decoded address lists. Set reset=True to clear change counter. Set force_scan=True to trigger immediate I2C rescan. """ wval = 2 if force_scan else (1 if reset else 0) data = self._vendor_in(CMD_GET_HOTPLUG_STATUS, value=wval, length=36) current_bitmap = bytes(data[0:16]) changes = struct.unpack_from(' list[int]: """Convert 16-byte I2C address bitmap to list of 7-bit addresses.""" addrs = [] for byte_idx in range(16): for bit in range(8): if bitmap[byte_idx] & (1 << bit): addrs.append((byte_idx << 3) | bit) return addrs # --- DiSEqC 1.2 command builders --- def diseqc_halt() -> bytes: """Stop positioner movement (DiSEqC 1.2 Halt).""" return bytes([0xE0, 0x31, 0x60]) def diseqc_drive_east(steps: int = 0) -> bytes: """Drive east. steps=0 for continuous, 1-127 for step count.""" return bytes([0xE0, 0x31, 0x68, min(steps, 0x7F)]) def diseqc_drive_west(steps: int = 0) -> bytes: """Drive west. steps=0 for continuous, 1-127 for step count.""" return bytes([0xE0, 0x31, 0x69, min(steps, 0x7F)]) def diseqc_store_position(slot: int) -> bytes: """Store current position in slot (0-255).""" return bytes([0xE0, 0x31, 0x6A, slot & 0xFF]) def diseqc_goto_position(slot: int) -> bytes: """Go to stored position (0-255). Slot 0 = reference/zero.""" return bytes([0xE0, 0x31, 0x6B, slot & 0xFF]) def diseqc_set_limit(direction: str) -> bytes: """Set east or west software limit at current position.""" if direction.lower() == "east": return bytes([0xE0, 0x31, 0x66, 0x00]) else: return bytes([0xE0, 0x31, 0x66, 0x01]) def diseqc_disable_limits() -> bytes: """Disable software limits.""" return bytes([0xE0, 0x31, 0x63]) def diseqc_goto_x(observer_lon: float, sat_lon: float) -> bytes: """ USALS GotoX command (DiSEqC 1.3 extension). Calculates motor rotation angle from observer and satellite longitude, then encodes as DiSEqC 1.2 GotoX (E0 31 6E HH LL). """ angle = usals_angle(observer_lon, sat_lon) hh, ll = usals_encode_angle(angle) return bytes([0xE0, 0x31, 0x6E, hh, ll]) def usals_angle(observer_lon: float, sat_lon: float, observer_lat: float = 0.0) -> float: """ Calculate USALS motor rotation angle in degrees. Positive = east, negative = west. Uses the standard USALS formula from DiSEqC 1.3 spec. observer_lat defaults to 0 (equator) for simplicity; the motor corrects for elevation internally. """ # Convert to radians obs_lon_r = math.radians(observer_lon) sat_lon_r = math.radians(sat_lon) obs_lat_r = math.radians(observer_lat) # Longitude difference delta_lon = sat_lon_r - obs_lon_r # USALS formula: angle = atan2(sin(delta_lon), cos(delta_lon) - R) # where R = Re / (Re + h) ≈ 0.1513 for GEO orbit # Simplified for equatorial mount: angle = math.degrees(math.atan2( math.sin(delta_lon), math.cos(delta_lon) - 6378.0 / (6378.0 + 35786.0) )) return angle def usals_encode_angle(angle_deg: float) -> tuple: """ Encode USALS angle to DiSEqC 1.3 byte pair (HH, LL). Format: HH.HL where HH = integer degrees, H nibble of LL = tenths, L nibble of LL = sixteenths. Bit 7 of HH = direction (1=west). """ west = angle_deg < 0 angle = abs(angle_deg) degrees = int(angle) fraction = angle - degrees # Fraction encoded as: upper nibble = tenths (0-9), # lower nibble = sixteenths (0-15) tenths = int(fraction * 10) & 0x0F sixteenths = int((fraction * 10 - tenths) * 16) & 0x0F hh = degrees & 0x7F if west: hh |= 0x80 # bit 7 = west ll = (tenths << 4) | sixteenths return hh, ll