diff --git a/firmware/skywalker1.c b/firmware/skywalker1.c index 48b4060..2a1a0c4 100644 --- a/firmware/skywalker1.c +++ b/firmware/skywalker1.c @@ -2,8 +2,9 @@ * Genpix SkyWalker-1 Custom Firmware * For Cypress CY7C68013A (FX2LP) + Broadcom BCM4500 demodulator * - * Stock-compatible vendor commands (0x80-0x94) plus new - * spectrum sweep, raw demod access, and blind scan commands (0xB0-0xB3). + * Stock-compatible vendor commands (0x80-0x94) plus custom + * spectrum sweep, raw demod access, blind scan (0xB0-0xB3), + * hardware diagnostics (0xB4-0xB6), and signal monitoring (0xB7-0xB9). * * SDCC + fx2lib toolchain. Loaded into FX2 RAM for testing. */ @@ -53,6 +54,9 @@ #define RAW_DEMOD_READ 0xB1 #define RAW_DEMOD_WRITE 0xB2 #define BLIND_SCAN 0xB3 +#define SIGNAL_MONITOR 0xB7 +#define TUNE_MONITOR 0xB8 +#define MULTI_REG_READ 0xB9 /* configuration status byte bits */ #define BM_STARTED 0x01 @@ -85,6 +89,9 @@ volatile __bit got_sud; static __xdata BYTE i2c_buf[16]; static __xdata BYTE i2c_rd[8]; +/* TUNE_MONITOR result buffer: filled by OUT phase, returned by IN phase */ +static __xdata BYTE tm_result[10]; + /* * BCM4500 register initialization data extracted from stock v2.06 firmware. * FUN_CODE_0ddd writes these 3 blocks to BCM4500 indirect registers (page 0) @@ -1055,8 +1062,8 @@ BOOL handle_vendorcommand(BYTE cmd) { /* 0x92: GET_FW_VERS -- return firmware version and build date */ case GET_FW_VERS: - EP0BUF[0] = 0x00; /* patch -> version 3.01.0 */ - EP0BUF[1] = 0x01; /* minor */ + EP0BUF[0] = 0x00; /* patch -> version 3.02.0 */ + EP0BUF[1] = 0x02; /* minor */ EP0BUF[2] = 0x03; /* major */ EP0BUF[3] = 0x0C; /* day = 12 */ EP0BUF[4] = 0x02; /* month = 2 */ @@ -1216,6 +1223,90 @@ BOOL handle_vendorcommand(BYTE cmd) { return TRUE; } + /* 0xB7: SIGNAL_MONITOR -- fast combined signal read (8 bytes) + * Returns SNR(2) + AGC1(2) + AGC2(2) + lock(1) + status(1) + * in a single USB transfer instead of 3 separate reads. */ + case SIGNAL_MONITOR: { + BYTE sm_val; + /* SNR: indirect regs 0x00-0x01 */ + bcm_indirect_read(0x00, &EP0BUF[0]); + bcm_indirect_read(0x01, &EP0BUF[1]); + /* AGC1: indirect regs 0x02-0x03 */ + bcm_indirect_read(0x02, &EP0BUF[2]); + bcm_indirect_read(0x03, &EP0BUF[3]); + /* AGC2: indirect regs 0x04-0x05 */ + bcm_indirect_read(0x04, &EP0BUF[4]); + bcm_indirect_read(0x05, &EP0BUF[5]); + /* Lock register (direct 0xA4) */ + sm_val = 0; + bcm_direct_read(BCM_REG_LOCK, &sm_val); + EP0BUF[6] = sm_val; + /* Status register (direct 0xA2) */ + sm_val = 0; + bcm_direct_read(BCM_REG_STATUS, &sm_val); + EP0BUF[7] = sm_val; + EP0BCH = 0; + EP0BCL = 8; + return TRUE; + } + + /* 0xB8: TUNE_MONITOR -- tune + dwell + read in one round-trip + * OUT phase (0x40): receive 10-byte tune payload, tune, dwell, read signal + * IN phase (0xC0): return stored 10-byte result + * wValue = dwell time in ms (1-255) */ + case TUNE_MONITOR: { + if (SETUPDAT[0] & 0x80) { + /* IN phase: return stored result from previous OUT phase */ + BYTE ti; + for (ti = 0; ti < 10; ti++) + EP0BUF[ti] = tm_result[ti]; + EP0BCH = 0; + EP0BCL = 10; + } else { + /* OUT phase: tune, dwell, measure */ + BYTE dwell = (BYTE)wval; + EP0BCL = 0; + SYNCDELAY; + while (EP0CS & bmEPBUSY) + ; + do_tune(); + if (dwell > 0) + delay(dwell); + /* Read signal into result buffer */ + bcm_indirect_read(0x00, &tm_result[0]); + bcm_indirect_read(0x01, &tm_result[1]); + bcm_indirect_read(0x02, &tm_result[2]); + bcm_indirect_read(0x03, &tm_result[3]); + bcm_indirect_read(0x04, &tm_result[4]); + bcm_indirect_read(0x05, &tm_result[5]); + tm_result[6] = 0; + bcm_direct_read(BCM_REG_LOCK, &tm_result[6]); + tm_result[7] = 0; + bcm_direct_read(BCM_REG_STATUS, &tm_result[7]); + tm_result[8] = dwell; + tm_result[9] = (BYTE)(wval >> 8); + } + return TRUE; + } + + /* 0xB9: MULTI_REG_READ -- batch read of contiguous indirect registers + * wValue = start register, wIndex = count (1-64) + * Returns count bytes, one per register */ + case MULTI_REG_READ: { + BYTE start_reg = (BYTE)wval; + BYTE count = (BYTE)SETUP_INDEX(); + BYTE mi; + if (count == 0 || count > 64) + count = 1; + for (mi = 0; mi < count; mi++) { + EP0BUF[mi] = 0; + bcm_indirect_read(start_reg + mi, &EP0BUF[mi]); + } + EP0BCH = 0; + EP0BCL = count; + return TRUE; + } + default: return FALSE; } diff --git a/tools/skywalker.py b/tools/skywalker.py new file mode 100755 index 0000000..e7f35e4 --- /dev/null +++ b/tools/skywalker.py @@ -0,0 +1,1086 @@ +#!/usr/bin/env python3 +""" +Genpix SkyWalker-1 multi-mode RF tool. + +Modes: + spectrum - Sweep spectrum analyzer (950-2150 MHz IF range) + scan - Automated transponder scanner (sweep + blind scan) + monitor - Real-time signal strength at a single frequency + lband - L-band direct input analyzer (no LNB) + track - Carrier/beacon tracker with logging +""" + +import sys +import os +import argparse +import time +import signal +import csv +import json +import struct +import math +from datetime import datetime + +# Add tools directory to path for library import +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from skywalker_lib import ( + SkyWalker1, MODULATIONS, FEC_RATES, MOD_FEC_GROUP, + LNB_LO_LOW, LNB_LO_HIGH, LBAND_ALLOCATIONS, + CMD_BLIND_SCAN, + snr_raw_to_db, snr_raw_to_pct, agc_to_power_db, + detect_peaks, if_to_rf, signal_bar, format_config_bits, +) + + +# --- Terminal rendering --- + +# ANSI color codes for waterfall display +WATERFALL_COLORS = [ + "\033[38;5;17m", # dark blue (weakest) + "\033[38;5;19m", + "\033[38;5;21m", + "\033[38;5;27m", + "\033[38;5;33m", + "\033[38;5;39m", # cyan + "\033[38;5;46m", # green + "\033[38;5;82m", + "\033[38;5;118m", + "\033[38;5;154m", # yellow-green + "\033[38;5;190m", + "\033[38;5;226m", # yellow + "\033[38;5;214m", + "\033[38;5;208m", # orange + "\033[38;5;196m", # red (strongest) + "\033[38;5;160m", +] +ANSI_RESET = "\033[0m" + +# Unicode block characters for bar charts +BARS_H = " ▏▎▍▌▋▊▉█" + +# Sparkline characters +SPARKS = "▁▂▃▄▅▆▇█" + + +def power_color(power_db: float, floor: float = -40.0, ceil: float = 0.0) -> str: + """Map a power_db value to an ANSI color escape.""" + ratio = (power_db - floor) / (ceil - floor) + ratio = max(0.0, min(1.0, ratio)) + idx = int(ratio * (len(WATERFALL_COLORS) - 1)) + return WATERFALL_COLORS[idx] + + +def ascii_bar_h(value: float, max_val: float, width: int = 50) -> str: + """Render a horizontal bar using Unicode block characters.""" + if max_val == 0: + return "" + ratio = max(0.0, min(1.0, value / max_val)) + full_blocks = int(ratio * width) + remainder = (ratio * width) - full_blocks + partial_idx = int(remainder * (len(BARS_H) - 1)) + + bar = "█" * full_blocks + if full_blocks < width: + bar += BARS_H[partial_idx] + bar += " " * (width - full_blocks - 1) + return bar + + +def sparkline(values: list, width: int = 60) -> str: + """Render a sparkline from a list of values.""" + if not values: + return "" + # Take last 'width' values + vals = values[-width:] + mn = min(vals) + mx = max(vals) + rng = mx - mn if mx != mn else 1.0 + return "".join( + SPARKS[min(len(SPARKS) - 1, int((v - mn) / rng * (len(SPARKS) - 1)))] + for v in vals + ) + + +def clear_line(): + """Clear the current terminal line.""" + sys.stdout.write("\r\033[K") + + +# --- Mode: spectrum --- + +def cmd_spectrum(sw: SkyWalker1, args: argparse.Namespace) -> None: + """Sweep 950-2150 MHz (or custom range), display power-vs-frequency.""" + start = args.start + stop = args.stop + step = args.step + dwell = args.dwell + sr_ksps = args.sr + lnb_lo = args.lnb_lo + num_sweeps = args.sweeps + + steps = int((stop - start) / step) + 1 + est_time = steps * (dwell + 2) / 1000.0 # dwell + USB overhead + + print(f"SkyWalker-1 Spectrum Analyzer") + print(f"{'=' * 60}") + print(f" IF Range: {start}-{stop} MHz (step {step} MHz)") + if lnb_lo > 0: + print(f" RF Range: {start + lnb_lo:.0f}-{stop + lnb_lo:.0f} MHz (LNB LO {lnb_lo} MHz)") + else: + print(f" Direct input (no LNB offset)") + print(f" Steps: {steps}") + print(f" Dwell: {dwell} ms") + print(f" Symbol rate: {sr_ksps} ksps") + print(f" Est. sweep: {est_time:.1f}s") + if num_sweeps > 1: + print(f" Sweeps: {num_sweeps}") + print() + + sw.ensure_booted() + + csv_writer = None + csv_file = None + if args.csv: + csv_file = open(args.csv, 'w', newline='') + csv_writer = csv.writer(csv_file) + csv_writer.writerow(["sweep", "if_mhz", "rf_mhz", "snr_raw", "snr_db", + "agc1", "agc2", "power_db", "locked"]) + + all_sweeps = [] + + for sweep_num in range(num_sweeps): + if num_sweeps > 1: + print(f"\n--- Sweep {sweep_num + 1}/{num_sweeps} ---") + + def progress(freq, step_num, total, result): + pct = (step_num + 1) / total * 100 + rf = if_to_rf(freq, lnb_lo) + clear_line() + sys.stdout.write(f" [{pct:5.1f}%] {freq:.0f} MHz IF" + f" ({rf:.0f} MHz RF)" + f" SNR={result['snr_db']:.1f} dB" + f" AGC={result['agc1']}") + sys.stdout.flush() + + t0 = time.time() + freqs, powers, results = sw.sweep_spectrum( + start, stop, step, dwell, sr_ksps, + callback=progress if not args.waterfall else None + ) + elapsed = time.time() - t0 + clear_line() + print(f" Sweep complete: {len(freqs)} points in {elapsed:.1f}s") + + all_sweeps.append((freqs, powers, results)) + + # Write CSV + if csv_writer: + for i, (f, p, r) in enumerate(zip(freqs, powers, results)): + rf = if_to_rf(f, lnb_lo) + csv_writer.writerow([ + sweep_num, f"{f:.1f}", f"{rf:.1f}", + r["snr_raw"], f"{r['snr_db']:.2f}", + r["agc1"], r["agc2"], + f"{p:.2f}", int(r["locked"]) + ]) + + # Terminal bar chart display + if not args.waterfall: + print() + p_min = min(powers) if powers else -40 + p_max = max(powers) if powers else 0 + p_range = p_max - p_min if p_max != p_min else 1 + + for i, (f, p) in enumerate(zip(freqs, powers)): + rf = if_to_rf(f, lnb_lo) + bar = ascii_bar_h(p - p_min, p_range, width=40) + label = f"{rf:7.0f}" if lnb_lo > 0 else f"{f:7.0f}" + locked = results[i]["locked"] + lock_mark = " *" if locked else "" + print(f" {label} |{bar}| {p:6.1f} dB{lock_mark}") + + # Waterfall display + if args.waterfall: + p_min = min(powers) if powers else -40 + p_max = max(powers) if powers else 0 + line = "" + for p in powers: + color = power_color(p, p_min, p_max) + line += f"{color}█{ANSI_RESET}" + ts = datetime.now().strftime("%H:%M:%S") + print(f" {ts} {line}") + + # Peak detection + if not args.waterfall and freqs: + peaks = detect_peaks(freqs, powers, threshold_db=args.threshold if hasattr(args, 'threshold') else 3.0) + if peaks: + print(f"\n Peaks ({len(peaks)} found):") + for freq, pwr, idx in peaks: + rf = if_to_rf(freq, lnb_lo) + locked = results[idx]["locked"] + lock_str = " LOCKED" if locked else "" + label = f"{rf:.0f} MHz RF" if lnb_lo > 0 else f"{freq:.0f} MHz" + print(f" {label} {pwr:.1f} dB{lock_str}") + + if csv_file: + csv_file.close() + print(f"\n CSV saved: {args.csv}") + + # Matplotlib plot + if args.plot: + _plot_spectrum(freqs, powers, lnb_lo, all_sweeps) + + +def _plot_spectrum(freqs, powers, lnb_lo, all_sweeps=None): + """Show matplotlib spectrum plot.""" + try: + import matplotlib.pyplot as plt + except ImportError: + print(" matplotlib required for --plot: pip install matplotlib") + return + + rf_freqs = [if_to_rf(f, lnb_lo) for f in freqs] + x_label = "Frequency (MHz RF)" if lnb_lo > 0 else "Frequency (MHz IF)" + + fig, ax = plt.subplots(figsize=(14, 6)) + ax.plot(rf_freqs, powers, '-', linewidth=0.8, color='#2196F3') + ax.fill_between(rf_freqs, min(powers), powers, alpha=0.15, color='#2196F3') + ax.set_xlabel(x_label) + ax.set_ylabel("Power (dB, relative)") + ax.set_title("SkyWalker-1 Spectrum") + ax.grid(True, alpha=0.3) + + # Mark peaks + peaks = detect_peaks(freqs, powers) + if peaks: + peak_x = [if_to_rf(f, lnb_lo) for f, _, _ in peaks] + peak_y = [p for _, p, _ in peaks] + ax.scatter(peak_x, peak_y, color='#F44336', zorder=5, s=30) + for px, py in zip(peak_x, peak_y): + ax.annotate(f"{px:.0f}", (px, py), textcoords="offset points", + xytext=(0, 8), ha='center', fontsize=7, color='#F44336') + + plt.tight_layout() + plt.show() + + +# --- Mode: scan --- + +def cmd_scan(sw: SkyWalker1, args: argparse.Namespace) -> None: + """Automated transponder scanner: coarse sweep, peak detect, blind scan.""" + start = args.start + stop = args.stop + lnb_lo = args.lnb_lo + threshold = args.threshold + sr_min = args.sr_min * 1000 + sr_max = args.sr_max * 1000 + sr_step = args.sr_step * 1000 + + print(f"SkyWalker-1 Transponder Scanner") + print(f"{'=' * 60}") + print(f" IF Range: {start}-{stop} MHz") + if lnb_lo > 0: + print(f" RF Range: {start + lnb_lo:.0f}-{stop + lnb_lo:.0f} MHz") + print(f" Threshold: {threshold} dB above noise floor") + print(f" SR Range: {args.sr_min}-{args.sr_max} ksps (step {args.sr_step})") + print() + + sw.ensure_booted() + + # Configure LNB + if args.pol: + sw.set_lnb_voltage(args.pol.upper() in ("H", "L")) + if args.band: + sw.set_22khz_tone(args.band == "high") + + # Phase 1: Coarse spectrum sweep + print("[Phase 1] Coarse spectrum sweep...") + coarse_step = 10 # MHz + freqs, powers, results = sw.sweep_spectrum( + start, stop, coarse_step, dwell_ms=15, sr_ksps=20000 + ) + print(f" {len(freqs)} points measured") + + # Phase 2: Find peaks + print(f"\n[Phase 2] Peak detection (threshold {threshold} dB)...") + peaks = detect_peaks(freqs, powers, threshold_db=threshold) + if not peaks: + print(" No peaks found above threshold.") + print(" Try lowering --threshold or checking dish alignment.") + return + + print(f" {len(peaks)} candidate peaks:") + for freq, pwr, idx in peaks: + rf = if_to_rf(freq, lnb_lo) + print(f" {rf:.0f} MHz {pwr:.1f} dB") + + # Phase 2.5: Fine sweep around each peak + print(f"\n[Phase 2.5] Fine sweep around peaks...") + refined_peaks = [] + for freq, pwr, idx in peaks: + fine_start = max(start, freq - 15) + fine_stop = min(stop, freq + 15) + fine_freqs, fine_powers, fine_results = sw.sweep_spectrum( + fine_start, fine_stop, step_mhz=2.0, dwell_ms=20, sr_ksps=20000 + ) + if fine_powers: + best_idx = fine_powers.index(max(fine_powers)) + refined_peaks.append(( + fine_freqs[best_idx], + fine_powers[best_idx], + fine_results[best_idx] + )) + rf = if_to_rf(fine_freqs[best_idx], lnb_lo) + print(f" {rf:.0f} MHz {fine_powers[best_idx]:.1f} dB (refined)") + + # Phase 3: Blind scan at each refined peak + print(f"\n[Phase 3] Blind scan at {len(refined_peaks)} peaks...") + found = [] + + for freq, pwr, result in refined_peaks: + freq_khz_int = int(freq * 1000) + rf = if_to_rf(freq, lnb_lo) + print(f" Scanning {rf:.0f} MHz (IF {freq:.0f})...", end="", flush=True) + + # Build blind scan EP0 payload + payload = struct.pack('= 8 and resp[0] != 0: + found_freq = struct.unpack_from(' None: + """Real-time signal monitor / dish alignment aid.""" + freq_mhz = args.freq + sr_ksps = args.sr + lnb_lo = args.lnb_lo + rate = args.rate + poll_interval = 1.0 / rate + + # Calculate IF frequency + if lnb_lo > 0: + if_mhz = freq_mhz - lnb_lo + else: + if_mhz = freq_mhz + + if_khz = int(if_mhz * 1000) + sr_sps = sr_ksps * 1000 + + print(f"SkyWalker-1 Signal Monitor") + print(f"{'=' * 60}") + if lnb_lo > 0: + print(f" Frequency: {freq_mhz} MHz (IF {if_mhz:.0f} MHz, LNB LO {lnb_lo} MHz)") + else: + print(f" Frequency: {if_mhz} MHz (direct input)") + print(f" Symbol rate: {sr_ksps} ksps") + print(f" Poll rate: {rate} Hz") + if args.audio: + print(f" Audio: ON") + print(f"\n Press Ctrl-C to stop\n") + + sw.ensure_booted() + + # Configure LNB + if args.pol: + sw.set_lnb_voltage(args.pol.upper() in ("H", "L")) + if args.band: + sw.set_22khz_tone(args.band == "high") + + # Initial tune + sw.tune(sr_sps, if_khz, 0, 5) # QPSK, auto-FEC + time.sleep(0.5) + + history = [] + peak_snr = 0.0 + peak_power = -99.0 + running = True + + def stop(signum, frame): + nonlocal running + running = False + + signal.signal(signal.SIGINT, stop) + signal.signal(signal.SIGTERM, stop) + + while running: + t0 = time.time() + + sig = sw.signal_monitor() + snr_db = sig["snr_db"] + power = sig["power_db"] + locked = sig["locked"] + agc1 = sig["agc1"] + + history.append(snr_db) + if args.peak_hold: + peak_snr = max(peak_snr, snr_db) + peak_power = max(peak_power, power) + + # Build display + lock_str = "LOCK" if locked else "----" + bar = signal_bar(sig["snr_pct"], width=35) + + clear_line() + line = f" [{lock_str}] SNR {snr_db:5.1f} dB AGC {agc1:5d} {bar}" + if args.peak_hold: + line += f" peak {peak_snr:.1f} dB" + sys.stdout.write(line) + + # Sparkline history + if len(history) > 1: + spark = sparkline(history, width=min(40, len(history))) + sys.stdout.write(f"\n History: {spark}") + sys.stdout.write("\033[F") # cursor up + + sys.stdout.flush() + + # Audio feedback + if args.audio: + _beep_proportional(snr_db) + + # Pace the polling + elapsed = time.time() - t0 + sleep_time = poll_interval - elapsed + if sleep_time > 0: + time.sleep(sleep_time) + + print(f"\n\n Stopped. {len(history)} samples collected.") + if args.peak_hold: + print(f" Peak SNR: {peak_snr:.1f} dB") + + if args.plot: + _plot_monitor_history(history, rate) + + +def _beep_proportional(snr_db: float): + """Emit a pitch-proportional beep for dish alignment.""" + # Map SNR 0-15 dB to frequency 200-2000 Hz + freq = int(200 + min(15, max(0, snr_db)) / 15 * 1800) + duration_ms = 50 + try: + sys.stdout.write(f"\033[10;{freq}]\033[11;{duration_ms}]\a") + sys.stdout.flush() + except Exception: + pass # Terminal doesn't support bell frequency control + + +def _plot_monitor_history(history, rate): + """Plot signal monitor history with matplotlib.""" + try: + import matplotlib.pyplot as plt + except ImportError: + print(" matplotlib required for --plot: pip install matplotlib") + return + + t = [i / rate for i in range(len(history))] + fig, ax = plt.subplots(figsize=(12, 5)) + ax.plot(t, history, '-', linewidth=0.8, color='#4CAF50') + ax.set_xlabel("Time (s)") + ax.set_ylabel("SNR (dB)") + ax.set_title("Signal Monitor History") + ax.grid(True, alpha=0.3) + plt.tight_layout() + plt.show() + + +# --- Mode: lband --- + +def cmd_lband(sw: SkyWalker1, args: argparse.Namespace) -> None: + """L-band direct input spectrum analyzer with allocation annotations.""" + start = args.start + stop = args.stop + step = args.step + dwell = args.dwell + + # Narrow to 23cm if requested + if args.ham_23cm: + start = 1240 + stop = 1300 + step = 0.5 + + steps = int((stop - start) / step) + 1 + est_time = steps * (dwell + 2) / 1000.0 + + print(f"SkyWalker-1 L-Band Analyzer") + print(f"{'=' * 60}") + print(f" Range: {start}-{stop} MHz (direct input, no LNB)") + print(f" Steps: {steps} (step {step} MHz, dwell {dwell} ms)") + print(f" Est. sweep: {est_time:.1f}s") + print() + print(" NOTE: Can detect carrier PRESENCE at any frequency even if") + print(" modulation is incompatible with the BCM4500 demodulator.") + print() + + sw.ensure_booted() + + # Disable LNB power for direct input + sw.start_intersil(on=False) + time.sleep(0.1) + + # Show band info + if args.band_info: + print(f" L-Band Allocations in range:") + for lo, hi, name in LBAND_ALLOCATIONS: + if lo < stop and hi > start: + overlap_lo = max(lo, start) + overlap_hi = min(hi, stop) + print(f" {overlap_lo:7.0f}-{overlap_hi:<7.0f} MHz {name}") + print() + + # Sweep + def progress(freq, step_num, total, result): + pct = (step_num + 1) / total * 100 + clear_line() + sys.stdout.write(f" [{pct:5.1f}%] {freq:.1f} MHz" + f" SNR={result['snr_db']:.1f} dB" + f" AGC={result['agc1']}") + sys.stdout.flush() + + t0 = time.time() + freqs, powers, results = sw.sweep_spectrum( + start, stop, step, dwell, sr_ksps=20000, + callback=progress if not args.waterfall else None + ) + elapsed = time.time() - t0 + clear_line() + print(f" Sweep complete: {len(freqs)} points in {elapsed:.1f}s\n") + + # CSV output + csv_writer = None + csv_file = None + if args.csv: + csv_file = open(args.csv, 'w', newline='') + csv_writer = csv.writer(csv_file) + csv_writer.writerow(["freq_mhz", "snr_raw", "snr_db", "agc1", "agc2", + "power_db", "locked", "allocation"]) + for i, (f, p, r) in enumerate(zip(freqs, powers, results)): + alloc = _freq_allocation(f) + csv_writer.writerow([ + f"{f:.1f}", r["snr_raw"], f"{r['snr_db']:.2f}", + r["agc1"], r["agc2"], f"{p:.2f}", int(r["locked"]), + alloc + ]) + csv_file.close() + print(f" CSV saved: {args.csv}") + + # Terminal display with allocation annotations + if not args.waterfall: + p_min = min(powers) if powers else -40 + p_max = max(powers) if powers else 0 + p_range = p_max - p_min if p_max != p_min else 1 + last_alloc = "" + + for i, (f, p) in enumerate(zip(freqs, powers)): + bar = ascii_bar_h(p - p_min, p_range, width=35) + locked = results[i]["locked"] + lock_mark = " *" if locked else "" + alloc = _freq_allocation(f) + + # Print allocation header when it changes + if alloc != last_alloc and alloc: + print(f" {'─' * 55}") + print(f" ┌ {alloc}") + last_alloc = alloc + elif alloc != last_alloc: + last_alloc = alloc + + print(f" {f:7.1f} |{bar}| {p:6.1f} dB{lock_mark}") + + # Waterfall + if args.waterfall: + p_min = min(powers) if powers else -40 + p_max = max(powers) if powers else 0 + line = "" + for p in powers: + color = power_color(p, p_min, p_max) + line += f"{color}█{ANSI_RESET}" + ts = datetime.now().strftime("%H:%M:%S") + print(f" {ts} {line}") + + # Peaks + peaks = detect_peaks(freqs, powers, threshold_db=3.0) + if peaks: + print(f"\n Peaks ({len(peaks)} found):") + for freq, pwr, idx in peaks: + alloc = _freq_allocation(freq) + alloc_str = f" [{alloc}]" if alloc else "" + locked = results[idx]["locked"] + lock_str = " LOCKED" if locked else "" + print(f" {freq:.1f} MHz {pwr:.1f} dB{lock_str}{alloc_str}") + + if args.plot: + _plot_lband(freqs, powers) + + +def _freq_allocation(freq_mhz: float) -> str: + """Return the allocation name for a given frequency, or empty string.""" + for lo, hi, name in LBAND_ALLOCATIONS: + if lo <= freq_mhz <= hi: + return name + return "" + + +def _plot_lband(freqs, powers): + """L-band spectrum plot with allocation shading.""" + try: + import matplotlib.pyplot as plt + except ImportError: + print(" matplotlib required for --plot: pip install matplotlib") + return + + fig, ax = plt.subplots(figsize=(14, 6)) + ax.plot(freqs, powers, '-', linewidth=0.8, color='#FF9800') + ax.fill_between(freqs, min(powers), powers, alpha=0.1, color='#FF9800') + + # Shade allocations + colors = ['#E3F2FD', '#FFF3E0', '#E8F5E9', '#F3E5F5', + '#FBE9E7', '#E0F7FA', '#ECEFF1'] + for i, (lo, hi, name) in enumerate(LBAND_ALLOCATIONS): + if lo < max(freqs) and hi > min(freqs): + c = colors[i % len(colors)] + ax.axvspan(lo, hi, alpha=0.3, color=c, label=name) + mid = (lo + hi) / 2 + ax.text(mid, max(powers) * 0.95, name, + ha='center', fontsize=6, rotation=45) + + ax.set_xlabel("Frequency (MHz)") + ax.set_ylabel("Power (dB, relative)") + ax.set_title("SkyWalker-1 L-Band Spectrum") + ax.grid(True, alpha=0.3) + plt.tight_layout() + plt.show() + + +# --- Mode: track --- + +def cmd_track(sw: SkyWalker1, args: argparse.Namespace) -> None: + """Lock to a frequency and log power/lock/status over time.""" + freq_mhz = args.freq + sr_ksps = args.sr + lnb_lo = args.lnb_lo + rate = args.rate + duration = args.duration + poll_interval = 1.0 / rate + + if lnb_lo > 0: + if_mhz = freq_mhz - lnb_lo + else: + if_mhz = freq_mhz + + if_khz = int(if_mhz * 1000) + sr_sps = sr_ksps * 1000 + + print(f"SkyWalker-1 Carrier Tracker") + print(f"{'=' * 60}") + if lnb_lo > 0: + print(f" Frequency: {freq_mhz} MHz (IF {if_mhz:.0f} MHz)") + else: + print(f" Frequency: {if_mhz} MHz (direct)") + print(f" Symbol rate: {sr_ksps} ksps") + print(f" Poll rate: {rate} Hz") + if duration: + print(f" Duration: {duration}s") + if args.log: + print(f" Log file: {args.log}") + print(f"\n Press Ctrl-C to stop\n") + + sw.ensure_booted() + + if args.pol: + sw.set_lnb_voltage(args.pol.upper() in ("H", "L")) + if args.band: + sw.set_22khz_tone(args.band == "high") + + # Tune once + sw.tune(sr_sps, if_khz, 0, 5) + time.sleep(0.5) + + # Setup logging + log_file = None + log_writer = None + if args.log: + log_file = open(args.log, 'w', newline='') + log_writer = csv.writer(log_file) + log_writer.writerow(["timestamp", "elapsed_s", "snr_db", "agc1", "agc2", + "power_db", "locked", "lock_reg", "status_reg"]) + + jsonl_file = None + if args.json_lines: + jsonl_file = open(args.json_lines, 'w') + + history_snr = [] + history_power = [] + was_locked = None + sample_count = 0 + start_time = time.time() + running = True + + def stop_handler(signum, frame): + nonlocal running + running = False + + signal.signal(signal.SIGINT, stop_handler) + signal.signal(signal.SIGTERM, stop_handler) + + # Drift tracking window + drift_window = 5 # MHz each side + drift_history = [] + + while running: + if duration and (time.time() - start_time) >= duration: + break + + t0 = time.time() + elapsed = t0 - start_time + + sig = sw.signal_monitor() + snr_db = sig["snr_db"] + power = sig["power_db"] + locked = sig["locked"] + + history_snr.append(snr_db) + history_power.append(power) + sample_count += 1 + + # Lock state transitions + if was_locked is not None and locked != was_locked: + ts = datetime.now().strftime("%H:%M:%S.%f")[:-3] + if locked: + print(f"\n [{ts}] >>> LOCK ACQUIRED SNR {snr_db:.1f} dB") + else: + print(f"\n [{ts}] <<< LOCK LOST") + was_locked = locked + + # Drift tracking + if args.drift_track and sample_count % (rate * 5) == 0: + # Every 5 seconds, do a narrow sweep to find peak + drift_start = max(950, if_mhz - drift_window) + drift_stop = min(2150, if_mhz + drift_window) + drift_freqs, drift_powers, _ = sw.sweep_spectrum( + drift_start, drift_stop, step_mhz=1.0, dwell_ms=5, sr_ksps=sr_ksps + ) + if drift_powers: + peak_idx = drift_powers.index(max(drift_powers)) + peak_freq = drift_freqs[peak_idx] + drift_hz = (peak_freq - if_mhz) * 1000 + drift_history.append((elapsed, drift_hz)) + if abs(drift_hz) > 100: + print(f"\n DRIFT: {drift_hz:+.0f} kHz from center") + # Re-tune to original + sw.tune(sr_sps, if_khz, 0, 5) + time.sleep(0.1) + + # Display + lock_str = "LOCK" if locked else "----" + bar = signal_bar(sig["snr_pct"], width=30) + clear_line() + sys.stdout.write(f" [{lock_str}] {elapsed:7.1f}s SNR {snr_db:5.1f} dB {bar}" + f" #{sample_count}") + sys.stdout.flush() + + # Log + ts_iso = datetime.now().isoformat() + if log_writer: + log_writer.writerow([ + ts_iso, f"{elapsed:.3f}", f"{snr_db:.2f}", + sig["agc1"], sig["agc2"], f"{power:.2f}", + int(locked), sig["lock"], sig["status"] + ]) + + if jsonl_file: + record = { + "ts": ts_iso, + "elapsed": round(elapsed, 3), + "snr_db": round(snr_db, 2), + "agc1": sig["agc1"], + "agc2": sig["agc2"], + "power_db": round(power, 2), + "locked": locked, + } + jsonl_file.write(json.dumps(record) + "\n") + + # Pace + sleep_time = poll_interval - (time.time() - t0) + if sleep_time > 0: + time.sleep(sleep_time) + + total_time = time.time() - start_time + print(f"\n\n Stopped. {sample_count} samples in {total_time:.1f}s") + + if log_file: + log_file.close() + print(f" Log saved: {args.log}") + if jsonl_file: + jsonl_file.close() + print(f" JSON-lines saved: {args.json_lines}") + + if args.plot: + _plot_track(history_snr, history_power, rate, drift_history) + + +def _plot_track(history_snr, history_power, rate, drift_history=None): + """Plot tracking history.""" + try: + import matplotlib.pyplot as plt + except ImportError: + print(" matplotlib required for --plot: pip install matplotlib") + return + + t = [i / rate for i in range(len(history_snr))] + + fig, axes = plt.subplots(2, 1, figsize=(14, 8), sharex=True) + + axes[0].plot(t, history_snr, '-', linewidth=0.6, color='#4CAF50') + axes[0].set_ylabel("SNR (dB)") + axes[0].set_title("Carrier Tracking") + axes[0].grid(True, alpha=0.3) + + axes[1].plot(t, history_power, '-', linewidth=0.6, color='#2196F3') + axes[1].set_ylabel("Power (dB)") + axes[1].set_xlabel("Time (s)") + axes[1].grid(True, alpha=0.3) + + if drift_history: + ax_drift = axes[0].twinx() + dt = [d[0] for d in drift_history] + dv = [d[1] for d in drift_history] + ax_drift.plot(dt, dv, 'o-', color='#F44336', markersize=3, linewidth=0.8) + ax_drift.set_ylabel("Drift (kHz)", color='#F44336') + + plt.tight_layout() + plt.show() + + +# --- CLI --- + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + prog="skywalker.py", + description="Genpix SkyWalker-1 multi-mode RF tool", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog="""\ +examples: + %(prog)s spectrum --start 950 --stop 2150 --step 5 + %(prog)s spectrum --lnb-lo 9750 --start 950 --stop 2150 --plot + %(prog)s scan --lnb-lo 9750 --pol H --band high + %(prog)s monitor 12520 27500 --lnb-lo 9750 --pol H --rate 10 + %(prog)s lband --band-info + %(prog)s lband --23cm --plot + %(prog)s track 12520 27500 --lnb-lo 9750 --log signal.csv --duration 60 + +RF coverage (with different LNB configurations): + No LNB (direct): 950-2150 MHz (L-band: ham 23cm, GPS, GOES, HRPT) + Ku LNB (9750 LO): 10700-11900 MHz (satellite TV low band) + Ku LNB (10600 LO): 11550-12750 MHz (satellite TV high band) + Custom (9000 LO): 9950-11150 MHz (QO-100 DATV @ ~1491 MHz IF) +""") + parser.add_argument('-v', '--verbose', action='store_true', + help="Show raw USB traffic") + + sub = parser.add_subparsers(dest='command') + + # spectrum + p_spec = sub.add_parser('spectrum', help="Sweep spectrum analyzer", + formatter_class=argparse.RawDescriptionHelpFormatter) + p_spec.add_argument('--start', type=float, default=950, + help="Start IF frequency in MHz (default: 950)") + p_spec.add_argument('--stop', type=float, default=2150, + help="Stop IF frequency in MHz (default: 2150)") + p_spec.add_argument('--step', type=float, default=5, + help="Step size in MHz (default: 5)") + p_spec.add_argument('--dwell', type=int, default=10, + help="Dwell time per step in ms (default: 10)") + p_spec.add_argument('--lnb-lo', type=float, default=0, + help="LNB LO frequency in MHz (0=direct input)") + p_spec.add_argument('--sr', type=int, default=20000, + help="Symbol rate for measurement in ksps (default: 20000)") + p_spec.add_argument('--waterfall', action='store_true', + help="Waterfall display (time x frequency x power)") + p_spec.add_argument('--sweeps', type=int, default=1, + help="Number of sweeps (default: 1)") + p_spec.add_argument('--threshold', type=float, default=3.0, + help="Peak detection threshold in dB (default: 3)") + p_spec.add_argument('--plot', action='store_true', + help="Show matplotlib plot") + p_spec.add_argument('--csv', metavar='FILE', + help="Save sweep data to CSV") + + # scan + p_scan = sub.add_parser('scan', help="Automated transponder scanner", + formatter_class=argparse.RawDescriptionHelpFormatter) + p_scan.add_argument('--start', type=float, default=950, + help="Start IF frequency in MHz (default: 950)") + p_scan.add_argument('--stop', type=float, default=2150, + help="Stop IF frequency in MHz (default: 2150)") + p_scan.add_argument('--threshold', type=float, default=3, + help="Peak detection threshold in dB (default: 3)") + p_scan.add_argument('--sr-min', type=int, default=1000, + help="Minimum symbol rate in ksps (default: 1000)") + p_scan.add_argument('--sr-max', type=int, default=30000, + help="Maximum symbol rate in ksps (default: 30000)") + p_scan.add_argument('--sr-step', type=int, default=500, + help="Symbol rate step in ksps (default: 500)") + p_scan.add_argument('--lnb-lo', type=float, default=9750, + help="LNB LO frequency in MHz (default: 9750)") + p_scan.add_argument('--pol', choices=['H', 'V', 'L', 'R'], + help="Polarization (sets LNB voltage)") + p_scan.add_argument('--band', choices=['low', 'high'], + help="LNB band (sets 22 kHz tone)") + p_scan.add_argument('--json', action='store_true', + help="Output results as JSON") + p_scan.add_argument('--csv', metavar='FILE', + help="Save results to CSV") + + # monitor + p_mon = sub.add_parser('monitor', help="Real-time signal monitor / dish alignment", + formatter_class=argparse.RawDescriptionHelpFormatter) + p_mon.add_argument('freq', type=float, + help="Frequency in MHz (RF if --lnb-lo set, IF otherwise)") + p_mon.add_argument('sr', type=int, + help="Symbol rate in ksps") + p_mon.add_argument('--pol', choices=['H', 'V', 'L', 'R'], + help="Polarization (sets LNB voltage)") + p_mon.add_argument('--band', choices=['low', 'high'], + help="LNB band (sets 22 kHz tone)") + p_mon.add_argument('--lnb-lo', type=float, default=0, + help="LNB LO frequency in MHz (0=direct input)") + p_mon.add_argument('--rate', type=float, default=10, + help="Poll rate in Hz (default: 10, max ~50)") + p_mon.add_argument('--audio', action='store_true', + help="Pitch-proportional beep for hands-free alignment") + p_mon.add_argument('--peak-hold', action='store_true', + help="Track and display maximum signal seen") + p_mon.add_argument('--history', type=int, default=60, + help="Sparkline history length in samples (default: 60)") + p_mon.add_argument('--plot', action='store_true', + help="Show matplotlib plot after stopping") + + # lband + p_lband = sub.add_parser('lband', help="L-band direct input analyzer", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog="""\ +L-band mode uses direct input (no LNB) to monitor 950-2150 MHz. +Can detect carrier PRESENCE at any frequency even if modulation +is incompatible with the BCM4500 demodulator. + +Known allocations in range: + 1240-1300 MHz Amateur 23cm + 1525-1559 MHz Inmarsat downlink + 1559-1610 MHz GNSS (GPS L1, Galileo E1) + 1610-1626 MHz Iridium downlink + 1670-1710 MHz MetSat (GOES LRIT, NOAA HRPT) + 1710-1785 MHz LTE/AWS uplink + 1920-2025 MHz UMTS uplink +""") + p_lband.add_argument('--start', type=float, default=950, + help="Start frequency in MHz (default: 950)") + p_lband.add_argument('--stop', type=float, default=2150, + help="Stop frequency in MHz (default: 2150)") + p_lband.add_argument('--step', type=float, default=2, + help="Step size in MHz (default: 2)") + p_lband.add_argument('--dwell', type=int, default=20, + help="Dwell time per step in ms (default: 20)") + p_lband.add_argument('--23cm', dest='ham_23cm', action='store_true', + help="Narrow to 1240-1300 MHz with 500 kHz steps") + p_lband.add_argument('--band-info', action='store_true', + help="Print L-band allocation table") + p_lband.add_argument('--waterfall', action='store_true', + help="Waterfall display") + p_lband.add_argument('--plot', action='store_true', + help="Show matplotlib plot") + p_lband.add_argument('--csv', metavar='FILE', + help="Save sweep data to CSV") + + # track + p_track = sub.add_parser('track', help="Carrier/beacon tracker with logging", + formatter_class=argparse.RawDescriptionHelpFormatter) + p_track.add_argument('freq', type=float, + help="Frequency in MHz (RF if --lnb-lo set, IF otherwise)") + p_track.add_argument('sr', type=int, + help="Symbol rate in ksps") + p_track.add_argument('--pol', choices=['H', 'V', 'L', 'R'], + help="Polarization (sets LNB voltage)") + p_track.add_argument('--band', choices=['low', 'high'], + help="LNB band (sets 22 kHz tone)") + p_track.add_argument('--lnb-lo', type=float, default=0, + help="LNB LO frequency in MHz (0=direct input)") + p_track.add_argument('--rate', type=float, default=1, + help="Poll rate in Hz (default: 1)") + p_track.add_argument('--duration', type=float, default=None, + help="Tracking duration in seconds (default: until Ctrl-C)") + p_track.add_argument('--log', metavar='FILE', + help="Log CSV: timestamp, snr, agc, power, lock, status") + p_track.add_argument('--drift-track', action='store_true', + help="Periodically sweep narrow window to measure frequency drift") + p_track.add_argument('--plot', action='store_true', + help="Show matplotlib plot after stopping") + p_track.add_argument('--json-lines', metavar='FILE', + help="Log as JSON-lines (one JSON object per line)") + + return parser + + +def main(): + parser = build_parser() + args = parser.parse_args() + + if not args.command: + parser.print_help() + sys.exit(0) + + dispatch = { + 'spectrum': cmd_spectrum, + 'scan': cmd_scan, + 'monitor': cmd_monitor, + 'lband': cmd_lband, + 'track': cmd_track, + } + + handler = dispatch.get(args.command) + if handler is None: + parser.print_help() + sys.exit(1) + + with SkyWalker1(verbose=args.verbose) as sw: + handler(sw, args) + + +if __name__ == '__main__': + main() diff --git a/tools/skywalker_lib.py b/tools/skywalker_lib.py new file mode 100644 index 0000000..05c926d --- /dev/null +++ b/tools/skywalker_lib.py @@ -0,0 +1,565 @@ +#!/usr/bin/env python3 +""" +Genpix SkyWalker-1 shared library. + +Provides the SkyWalker1 USB interface class, constants, and signal +processing utilities used by skywalker.py and tune.py. +""" + +import sys +import struct +import time +import math + +try: + import usb.core + import usb.util +except ImportError: + print("pyusb required: pip install pyusb") + sys.exit(1) + + +# --- USB identifiers --- + +VENDOR_ID = 0x09C0 +PRODUCT_ID = 0x0203 +EP2_ADDR = 0x82 +EP2_URB_SIZE = 8192 + +# --- Vendor commands --- + +CMD_GET_8PSK_CONFIG = 0x80 +CMD_I2C_WRITE = 0x83 +CMD_I2C_READ = 0x84 +CMD_ARM_TRANSFER = 0x85 +CMD_TUNE_8PSK = 0x86 +CMD_GET_SIGNAL_STRENGTH = 0x87 +CMD_LOAD_BCM4500 = 0x88 +CMD_BOOT_8PSK = 0x89 +CMD_START_INTERSIL = 0x8A +CMD_SET_LNB_VOLTAGE = 0x8B +CMD_SET_22KHZ_TONE = 0x8C +CMD_SEND_DISEQC = 0x8D +CMD_GET_SIGNAL_LOCK = 0x90 +CMD_GET_FW_VERS = 0x92 +CMD_GET_SERIAL_NUMBER = 0x93 +CMD_USE_EXTRA_VOLT = 0x94 + +# Custom commands (v3.01+) +CMD_SPECTRUM_SWEEP = 0xB0 +CMD_RAW_DEMOD_READ = 0xB1 +CMD_RAW_DEMOD_WRITE = 0xB2 +CMD_BLIND_SCAN = 0xB3 +CMD_I2C_BUS_SCAN = 0xB4 +CMD_I2C_RAW_READ = 0xB5 +CMD_I2C_DIAG = 0xB6 + +# Custom commands (v3.02+) +CMD_SIGNAL_MONITOR = 0xB7 +CMD_TUNE_MONITOR = 0xB8 +CMD_MULTI_REG_READ = 0xB9 + +# --- Config status bits --- + +CONFIG_BITS = { + 0x01: ("8PSK Started", "bm8pskStarted"), + 0x02: ("BCM4500 FW Loaded", "bm8pskFW_Loaded"), + 0x04: ("LNB Power On", "bmIntersilOn"), + 0x08: ("DVB Mode", "bmDVBmode"), + 0x10: ("22 kHz Tone", "bm22kHz"), + 0x20: ("18V Selected", "bmSEL18V"), + 0x40: ("DC Tuned", "bmDCtuned"), + 0x80: ("Armed (streaming)", "bmArmed"), +} + +# --- Modulation and FEC tables --- + +MODULATIONS = { + "qpsk": (0, "DVB-S QPSK"), + "turbo-qpsk": (1, "Turbo QPSK"), + "turbo-8psk": (2, "Turbo 8PSK"), + "turbo-16qam": (3, "Turbo 16QAM"), + "dcii-combo": (4, "DCII Combo"), + "dcii-i": (5, "DCII I-stream"), + "dcii-q": (6, "DCII Q-stream"), + "dcii-oqpsk": (7, "DCII Offset QPSK"), + "dss": (8, "DSS QPSK"), + "bpsk": (9, "DVB BPSK"), +} + +FEC_RATES = { + "dvbs": { + "1/2": 0, "2/3": 1, "3/4": 2, "5/6": 3, + "7/8": 4, "auto": 5, "none": 6, + }, + "turbo": { + "1/2": 0, "2/3": 1, "3/4": 2, "5/6": 3, "auto": 4, + }, + "turbo-16qam": { + "3/4": 0, "auto": 0, + }, + "dcii": { + "1/2": 0, "2/3": 1, "6/7": 2, "3/4": 3, "5/11": 4, + "1/2+": 5, "2/3+": 6, "6/7+": 7, "3/4+": 8, "auto": 0, + }, +} + +MOD_FEC_GROUP = { + "qpsk": "dvbs", + "turbo-qpsk": "turbo", + "turbo-8psk": "turbo", + "turbo-16qam": "turbo-16qam", + "dcii-combo": "dcii", + "dcii-i": "dcii", + "dcii-q": "dcii", + "dcii-oqpsk": "dcii", + "dss": "dvbs", + "bpsk": "dvbs", +} + +# --- LNB defaults --- + +LNB_LO_LOW = 9750 # Universal LNB low-band (MHz) +LNB_LO_HIGH = 10600 # Universal LNB high-band (MHz) + +# --- L-band allocations (for annotation) --- + +LBAND_ALLOCATIONS = [ + (1240, 1300, "Amateur 23cm"), + (1525, 1559, "Inmarsat downlink"), + (1559, 1610, "GNSS (GPS L1, Galileo E1)"), + (1610, 1626, "Iridium downlink"), + (1670, 1710, "MetSat (GOES LRIT, NOAA HRPT)"), + (1710, 1785, "LTE/AWS uplink"), + (1920, 2025, "UMTS uplink"), +] + + +# --- Signal processing helpers --- + +def snr_raw_to_db(snr_raw: int) -> float: + """Convert BCM4500 SNR register to dB. Register is dBu * 256.""" + return snr_raw / 256.0 + + +def snr_raw_to_pct(snr_raw: int) -> float: + """Convert raw SNR to percentage (0-100 scale, clamped).""" + scaled = min(snr_raw * 17, 65535) + return (scaled / 65535) * 100 + + +def agc_to_power_db(agc1: int, agc2: int) -> float: + """ + Estimate received power from AGC register values. + + The AGC loop adjusts gain to keep the signal level constant at the + ADC input. Higher AGC = weaker signal (more gain needed). This is + an approximation; the exact mapping depends on the BCM3440 tuner's + gain curve. + + Returns a relative dB value (higher = stronger signal). + """ + # AGC1 is the primary gain control, AGC2 is fine adjustment. + # Invert: low AGC value = high signal = high power. + # Scale to approximate dB with ~40 dB dynamic range. + combined = agc1 + (agc2 >> 4) + if combined == 0: + return 0.0 + # Rough linear-to-dB: 65535 AGC ≈ -40 dB, 0 AGC ≈ 0 dB + return -40.0 * (combined / 65535.0) + + +def detect_peaks(freqs: list, powers: list, threshold_db: float = 3.0) -> list: + """ + Find peaks in a spectrum sweep. + + Returns list of (freq_mhz, power_db, index) tuples for each local + maximum that exceeds the noise floor by threshold_db. + """ + if len(powers) < 3: + return [] + + # Estimate noise floor as the 25th percentile + sorted_p = sorted(powers) + noise_floor = sorted_p[len(sorted_p) // 4] + + peaks = [] + for i in range(1, len(powers) - 1): + if powers[i] > powers[i - 1] and powers[i] > powers[i + 1]: + if powers[i] - noise_floor >= threshold_db: + peaks.append((freqs[i], powers[i], i)) + + return peaks + + +def if_to_rf(if_mhz: float, lnb_lo: float) -> float: + """Convert IF frequency to actual RF frequency given LNB LO.""" + return if_mhz + lnb_lo + + +def rf_to_if(rf_mhz: float, lnb_lo: float) -> float: + """Convert actual RF frequency to IF frequency given LNB LO.""" + return rf_mhz - lnb_lo + + +def signal_bar(pct: float, width: int = 40) -> str: + """Render an ASCII signal strength bar.""" + filled = int(pct / 100 * width) + filled = max(0, min(filled, width)) + bar = '#' * filled + '-' * (width - filled) + return f"[{bar}] {pct:.1f}%" + + +def format_config_bits(status: int) -> list: + """Return list of (bit_name, is_set) tuples for config byte.""" + result = [] + for bit, (name, _field) in CONFIG_BITS.items(): + result.append((name, bool(status & bit))) + return result + + +# --- SkyWalker1 USB interface --- + +class SkyWalker1: + """USB interface to the Genpix SkyWalker-1 DVB-S receiver.""" + + def __init__(self, verbose: bool = False): + self.dev = None + self.detached_intf = None + self.verbose = verbose + + def open(self) -> None: + """Find and claim the SkyWalker-1 USB device.""" + self.dev = usb.core.find(idVendor=VENDOR_ID, idProduct=PRODUCT_ID) + if self.dev is None: + print("SkyWalker-1 not found (VID 0x09C0, PID 0x0203). Is it plugged in?") + sys.exit(1) + + for cfg in self.dev: + for intf in cfg: + if self.dev.is_kernel_driver_active(intf.bInterfaceNumber): + try: + self.dev.detach_kernel_driver(intf.bInterfaceNumber) + self.detached_intf = intf.bInterfaceNumber + if self.verbose: + print(f" Detached kernel driver from interface {intf.bInterfaceNumber}") + except usb.core.USBError as e: + print(f"Cannot detach kernel driver: {e}") + print("The gp8psk module must be unbound first. Try one of:") + print(" sudo modprobe -r dvb_usb_gp8psk") + print(" echo '' | sudo tee /sys/bus/usb/drivers/gp8psk/unbind") + sys.exit(1) + + try: + self.dev.set_configuration() + except usb.core.USBError: + pass + + def close(self) -> None: + """Release device and re-attach kernel driver.""" + if self.dev is None: + return + if self.detached_intf is not None: + try: + usb.util.release_interface(self.dev, self.detached_intf) + self.dev.attach_kernel_driver(self.detached_intf) + if self.verbose: + print("Re-attached kernel driver") + except usb.core.USBError: + print("Note: run 'sudo modprobe dvb_usb_gp8psk' to reload driver") + + def __enter__(self): + self.open() + return self + + def __exit__(self, *exc): + self.close() + + # -- Low-level USB transfers -- + + def _vendor_in(self, request: int, value: int = 0, index: int = 0, + length: int = 64, retries: int = 3) -> bytes: + """Vendor IN control transfer (device-to-host), with retry.""" + for attempt in range(retries): + try: + data = self.dev.ctrl_transfer( + usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_IN, + request, value, index, length, 2000 + ) + if self.verbose: + raw = bytes(data).hex(' ') + print(f" USB IN req=0x{request:02X} val=0x{value:04X} " + f"idx=0x{index:04X} -> [{len(data)}] {raw}") + if len(data) == length: + return bytes(data) + if self.verbose: + print(f" Partial read ({len(data)}/{length}), retry {attempt + 1}") + continue + except usb.core.USBError as e: + if self.verbose: + print(f" USB IN req=0x{request:02X} FAILED: {e}") + if attempt == retries - 1: + raise + return bytes(data) + + def _vendor_out(self, request: int, value: int = 0, index: int = 0, + data: bytes = b'') -> int: + """Vendor OUT control transfer (host-to-device).""" + if self.verbose: + raw = data.hex(' ') if data else "(no data)" + print(f" USB OUT req=0x{request:02X} val=0x{value:04X} " + f"idx=0x{index:04X} data=[{len(data)}] {raw}") + return self.dev.ctrl_transfer( + usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_OUT, + request, value, index, data, 2000 + ) + + # -- Device info -- + + def get_config(self) -> int: + """Read 8PSK config status byte.""" + data = self._vendor_in(CMD_GET_8PSK_CONFIG, length=1) + return data[0] + + def get_fw_version(self) -> dict: + """Read firmware version. Returns dict with version string and date.""" + data = self._vendor_in(CMD_GET_FW_VERS, length=6) + return { + "major": data[2], + "minor": data[1], + "patch": data[0], + "version": f"{data[2]}.{data[1]:02d}.{data[0]}", + "date": f"20{data[5]:02d}-{data[4]:02d}-{data[3]:02d}", + } + + def get_signal_lock(self) -> bool: + """Read signal lock status.""" + data = self._vendor_in(CMD_GET_SIGNAL_LOCK, length=1) + return data[0] != 0 + + def get_signal_strength(self) -> dict: + """Read signal strength. Returns SNR info dict.""" + data = self._vendor_in(CMD_GET_SIGNAL_STRENGTH, length=6) + snr_raw = struct.unpack_from(' int: + """Power on/off the 8PSK demodulator.""" + data = self._vendor_in(CMD_BOOT_8PSK, value=int(on), length=1) + return data[0] + + def start_intersil(self, on: bool = True) -> int: + """Enable/disable LNB power supply.""" + data = self._vendor_in(CMD_START_INTERSIL, value=int(on), length=1) + return data[0] + + def set_lnb_voltage(self, high: bool) -> None: + """Set LNB voltage: high=True for 18V, False for 13V.""" + self._vendor_out(CMD_SET_LNB_VOLTAGE, value=int(high)) + + def set_22khz_tone(self, on: bool) -> None: + """Enable/disable 22 kHz tone.""" + self._vendor_out(CMD_SET_22KHZ_TONE, value=int(on)) + + def set_extra_voltage(self, on: bool) -> None: + """Enable +1V LNB boost: 13->14V, 18->19V.""" + self._vendor_out(CMD_USE_EXTRA_VOLT, value=int(on)) + + # -- Tuning -- + + def tune(self, symbol_rate_sps: int, freq_khz: int, + mod_index: int, fec_index: int) -> None: + """Send TUNE_8PSK with 10-byte payload.""" + payload = struct.pack(' None: + """Start/stop MPEG-2 transport stream.""" + self._vendor_out(CMD_ARM_TRANSFER, value=int(on)) + + def read_stream(self, size: int = EP2_URB_SIZE, + timeout: int = 1000) -> bytes: + """Read a chunk from the TS bulk endpoint.""" + try: + data = self.dev.read(EP2_ADDR, size, timeout) + return bytes(data) + except usb.core.USBTimeoutError: + return b'' + except usb.core.USBError as e: + if self.verbose: + print(f" EP2 read error: {e}") + return b'' + + # -- DiSEqC -- + + def send_diseqc_tone_burst(self, mini_cmd: int) -> None: + """Send tone burst (mini-DiSEqC). 0=SEC_MINI_A, 1=SEC_MINI_B.""" + self._vendor_out(CMD_SEND_DISEQC, value=mini_cmd) + + def send_diseqc_message(self, msg: bytes) -> None: + """Send full DiSEqC message (3-6 bytes).""" + if len(msg) < 3 or len(msg) > 6: + raise ValueError(f"DiSEqC message must be 3-6 bytes, got {len(msg)}") + self._vendor_out(CMD_SEND_DISEQC, value=msg[0], data=msg) + + # -- New commands (v3.02+) -- + + def signal_monitor(self) -> dict: + """ + Fast combined signal read (0xB7). Returns 8 bytes in one transfer: + SNR(2) + AGC1(2) + AGC2(2) + lock(1) + status(1). + """ + data = self._vendor_in(CMD_SIGNAL_MONITOR, length=8) + snr_raw = struct.unpack_from(' dict: + """ + Tune + dwell + signal read in one round-trip (0xB8). + + Sends tune parameters via OUT phase, firmware tunes + waits + dwell_ms + reads signal. Then IN phase returns the result. + """ + dwell_ms = max(1, min(255, dwell_ms)) + payload = struct.pack(' bytes: + """ + Batch read of contiguous BCM4500 indirect registers (0xB9). + + Returns count bytes, one per register. Up to 64 registers + in a single USB transfer (vs. individual 0xB1 reads). + """ + count = max(1, min(64, count)) + data = self._vendor_in(CMD_MULTI_REG_READ, value=start_reg, + index=count, length=count) + return bytes(data) + + # -- High-level sweep helpers -- + + def sweep_spectrum(self, start_mhz: float, stop_mhz: float, + step_mhz: float = 5.0, dwell_ms: int = 10, + sr_ksps: int = 20000, mod_index: int = 0, + fec_index: int = 5, + callback=None) -> tuple: + """ + Sweep a frequency range and return power measurements. + + Uses TUNE_MONITOR (0xB8) at each step for efficient measurement. + Default tune params: QPSK, auto-FEC, 20 Msps. + + callback(freq_mhz, step_num, total_steps, result) is called + per step if provided. + + Returns (freqs_mhz[], powers_db[], raw_results[]). + """ + sr_sps = sr_ksps * 1000 + freqs = [] + powers = [] + results = [] + + freq = start_mhz + steps = int((stop_mhz - start_mhz) / step_mhz) + 1 + step_num = 0 + + while freq <= stop_mhz: + freq_khz = int(freq * 1000) + result = self.tune_monitor(sr_sps, freq_khz, mod_index, + fec_index, dwell_ms) + freqs.append(freq) + powers.append(result["power_db"]) + results.append(result) + + if callback: + callback(freq, step_num, steps, result) + + step_num += 1 + freq += step_mhz + + return freqs, powers, results + + def ensure_booted(self) -> None: + """Boot demodulator and enable LNB power if not already running.""" + status = self.get_config() + if not (status & 0x01): + self.boot(on=True) + time.sleep(0.5) + status = self.get_config() + if not (status & 0x01): + raise RuntimeError("Device failed to start") + if not (status & 0x04): + self.start_intersil(on=True) + time.sleep(0.3) + + def configure_lnb(self, pol: str = None, band: str = None, + lnb_lo: float = None, disable_lnb: bool = False) -> float: + """ + Configure LNB voltage, tone, and return the effective LO frequency. + + pol: 'H'/'V'/'L'/'R' or None (don't change) + band: 'low'/'high' or None (don't change) + lnb_lo: explicit LO freq in MHz, or None for auto + disable_lnb: True to disable LNB power (for direct input) + """ + if disable_lnb: + self.start_intersil(on=False) + return 0.0 + + if pol: + high_voltage = pol.upper() in ("H", "L") + self.set_lnb_voltage(high_voltage) + + if band: + self.set_22khz_tone(band == "high") + + if lnb_lo is not None: + return lnb_lo + elif band == "high": + return LNB_LO_HIGH + else: + return LNB_LO_LOW diff --git a/tools/tune.py b/tools/tune.py index e37c889..75c2bc6 100755 --- a/tools/tune.py +++ b/tools/tune.py @@ -18,316 +18,18 @@ import json import signal import os -try: - import usb.core - import usb.util -except ImportError: - print("pyusb required: pip install pyusb") - sys.exit(1) +# Add tools directory to path for library import +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) -VENDOR_ID = 0x09C0 -PRODUCT_ID = 0x0203 +from skywalker_lib import ( + SkyWalker1, VENDOR_ID, PRODUCT_ID, EP2_URB_SIZE, + MODULATIONS, FEC_RATES, MOD_FEC_GROUP, + LNB_LO_LOW, LNB_LO_HIGH, + CONFIG_BITS, + signal_bar, format_config_bits, +) -# Streaming endpoint -EP2_ADDR = 0x82 -EP2_URB_SIZE = 8192 - -# Vendor commands -CMD_GET_8PSK_CONFIG = 0x80 -CMD_I2C_WRITE = 0x83 -CMD_I2C_READ = 0x84 -CMD_ARM_TRANSFER = 0x85 -CMD_TUNE_8PSK = 0x86 -CMD_GET_SIGNAL_STRENGTH = 0x87 -CMD_LOAD_BCM4500 = 0x88 -CMD_BOOT_8PSK = 0x89 -CMD_START_INTERSIL = 0x8A -CMD_SET_LNB_VOLTAGE = 0x8B -CMD_SET_22KHZ_TONE = 0x8C -CMD_SEND_DISEQC = 0x8D -CMD_GET_SIGNAL_LOCK = 0x90 -CMD_GET_FW_VERS = 0x92 -CMD_GET_SERIAL_NUMBER = 0x93 -CMD_USE_EXTRA_VOLT = 0x94 - -# Config status bits (GET_8PSK_CONFIG response) -CONFIG_BITS = { - 0x01: ("8PSK Started", "bm8pskStarted"), - 0x02: ("BCM4500 FW Loaded", "bm8pskFW_Loaded"), - 0x04: ("LNB Power On", "bmIntersilOn"), - 0x08: ("DVB Mode", "bmDVBmode"), - 0x10: ("22 kHz Tone", "bm22kHz"), - 0x20: ("18V Selected", "bmSEL18V"), - 0x40: ("DC Tuned", "bmDCtuned"), - 0x80: ("Armed (streaming)", "bmArmed"), -} - -# Modulation types for TUNE_8PSK byte 8 -MODULATIONS = { - "qpsk": (0, "DVB-S QPSK"), - "turbo-qpsk": (1, "Turbo QPSK"), - "turbo-8psk": (2, "Turbo 8PSK"), - "turbo-16qam": (3, "Turbo 16QAM"), - "dcii-combo": (4, "DCII Combo"), - "dcii-i": (5, "DCII I-stream"), - "dcii-q": (6, "DCII Q-stream"), - "dcii-oqpsk": (7, "DCII Offset QPSK"), - "dss": (8, "DSS QPSK"), - "bpsk": (9, "DVB BPSK"), -} - -# FEC rate indices per modulation group -FEC_RATES = { - "dvbs": { - "1/2": 0, "2/3": 1, "3/4": 2, "5/6": 3, - "7/8": 4, "auto": 5, "none": 6, - }, - "turbo": { - "1/2": 0, "2/3": 1, "3/4": 2, "5/6": 3, "auto": 4, - }, - "turbo-16qam": { - "3/4": 0, "auto": 0, - }, - "dcii": { - "1/2": 0, "2/3": 1, "6/7": 2, "3/4": 3, "5/11": 4, - "1/2+": 5, "2/3+": 6, "6/7+": 7, "3/4+": 8, "auto": 0, - }, -} - -# Map modulation names to FEC group -MOD_FEC_GROUP = { - "qpsk": "dvbs", - "turbo-qpsk": "turbo", - "turbo-8psk": "turbo", - "turbo-16qam": "turbo-16qam", - "dcii-combo": "dcii", - "dcii-i": "dcii", - "dcii-q": "dcii", - "dcii-oqpsk": "dcii", - "dss": "dvbs", - "bpsk": "dvbs", -} - -# Default LNB LO frequencies (MHz) -LNB_LO_LOW = 9750 # Universal LNB low-band -LNB_LO_HIGH = 10600 # Universal LNB high-band - - -class SkyWalker1: - """USB interface to the Genpix SkyWalker-1 DVB-S receiver.""" - - def __init__(self, verbose: bool = False): - self.dev = None - self.detached_intf = None - self.verbose = verbose - - def open(self) -> None: - """Find and claim the SkyWalker-1 USB device.""" - self.dev = usb.core.find(idVendor=VENDOR_ID, idProduct=PRODUCT_ID) - if self.dev is None: - print("SkyWalker-1 not found (VID 0x09C0, PID 0x0203). Is it plugged in?") - sys.exit(1) - - # Detach kernel driver if bound - for cfg in self.dev: - for intf in cfg: - if self.dev.is_kernel_driver_active(intf.bInterfaceNumber): - try: - self.dev.detach_kernel_driver(intf.bInterfaceNumber) - self.detached_intf = intf.bInterfaceNumber - if self.verbose: - print(f" Detached kernel driver from interface {intf.bInterfaceNumber}") - except usb.core.USBError as e: - print(f"Cannot detach kernel driver: {e}") - print("The gp8psk module must be unbound first. Try one of:") - print(" sudo modprobe -r dvb_usb_gp8psk") - print(" echo '' | sudo tee /sys/bus/usb/drivers/gp8psk/unbind") - sys.exit(1) - - try: - self.dev.set_configuration() - except usb.core.USBError: - pass # May already be configured - - def close(self) -> None: - """Release device and re-attach kernel driver.""" - if self.dev is None: - return - if self.detached_intf is not None: - try: - usb.util.release_interface(self.dev, self.detached_intf) - self.dev.attach_kernel_driver(self.detached_intf) - if self.verbose: - print("Re-attached kernel driver") - except usb.core.USBError: - print("Note: run 'sudo modprobe dvb_usb_gp8psk' to reload driver") - - def __enter__(self): - self.open() - return self - - def __exit__(self, *exc): - self.close() - - # -- Low-level USB transfers -- - - def _vendor_in(self, request: int, value: int = 0, index: int = 0, - length: int = 64, retries: int = 3) -> bytes: - """Vendor IN control transfer (device-to-host), with retry.""" - for attempt in range(retries): - try: - data = self.dev.ctrl_transfer( - usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_IN, - request, value, index, length, 2000 - ) - if self.verbose: - raw = bytes(data).hex(' ') - print(f" USB IN req=0x{request:02X} val=0x{value:04X} " - f"idx=0x{index:04X} -> [{len(data)}] {raw}") - if len(data) == length: - return bytes(data) - # Partial read, retry - if self.verbose: - print(f" Partial read ({len(data)}/{length}), retry {attempt + 1}") - continue - except usb.core.USBError as e: - if self.verbose: - print(f" USB IN req=0x{request:02X} FAILED: {e}") - if attempt == retries - 1: - raise - return bytes(data) - - def _vendor_out(self, request: int, value: int = 0, index: int = 0, - data: bytes = b'') -> int: - """Vendor OUT control transfer (host-to-device).""" - if self.verbose: - raw = data.hex(' ') if data else "(no data)" - print(f" USB OUT req=0x{request:02X} val=0x{value:04X} " - f"idx=0x{index:04X} data=[{len(data)}] {raw}") - return self.dev.ctrl_transfer( - usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_OUT, - request, value, index, data, 2000 - ) - - # -- Device info commands -- - - def get_config(self) -> int: - """Read 8PSK config status byte (GET_8PSK_CONFIG 0x80).""" - data = self._vendor_in(CMD_GET_8PSK_CONFIG, length=1) - return data[0] - - def get_fw_version(self) -> dict: - """Read firmware version (GET_FW_VERS 0x92). Returns dict.""" - data = self._vendor_in(CMD_GET_FW_VERS, length=6) - return { - "major": data[2], - "minor": data[1], - "patch": data[0], - "version": f"{data[2]}.{data[1]:02d}.{data[0]}", - "date": f"20{data[5]:02d}-{data[4]:02d}-{data[3]:02d}", - } - - def get_signal_lock(self) -> bool: - """Read signal lock status (GET_SIGNAL_LOCK 0x90).""" - data = self._vendor_in(CMD_GET_SIGNAL_LOCK, length=1) - return data[0] != 0 - - def get_signal_strength(self) -> dict: - """Read signal strength (GET_SIGNAL_STRENGTH 0x87). Returns SNR info.""" - data = self._vendor_in(CMD_GET_SIGNAL_STRENGTH, length=6) - snr_raw = struct.unpack_from(' int: - """Power on/off the 8PSK demodulator (BOOT_8PSK 0x89).""" - data = self._vendor_in(CMD_BOOT_8PSK, value=int(on), length=1) - return data[0] - - def start_intersil(self, on: bool = True) -> int: - """Enable/disable LNB power supply (START_INTERSIL 0x8A).""" - data = self._vendor_in(CMD_START_INTERSIL, value=int(on), length=1) - return data[0] - - def set_lnb_voltage(self, high: bool) -> None: - """Set LNB voltage: high=True for 18V (H/L), high=False for 13V (V/R).""" - self._vendor_out(CMD_SET_LNB_VOLTAGE, value=int(high)) - - def set_22khz_tone(self, on: bool) -> None: - """Enable/disable 22 kHz tone (SET_22KHZ_TONE 0x8C).""" - self._vendor_out(CMD_SET_22KHZ_TONE, value=int(on)) - - def set_extra_voltage(self, on: bool) -> None: - """Enable +1V LNB boost: 13->14V, 18->19V (USE_EXTRA_VOLT 0x94).""" - self._vendor_out(CMD_USE_EXTRA_VOLT, value=int(on)) - - # -- Tuning -- - - def tune(self, symbol_rate_sps: int, freq_khz: int, - mod_index: int, fec_index: int) -> None: - """Send TUNE_8PSK (0x86) with 10-byte payload.""" - payload = struct.pack(' None: - """Start/stop MPEG-2 transport stream (ARM_TRANSFER 0x85).""" - self._vendor_out(CMD_ARM_TRANSFER, value=int(on)) - - def read_stream(self, size: int = EP2_URB_SIZE, - timeout: int = 1000) -> bytes: - """Read a chunk from the TS bulk endpoint (EP2 0x82).""" - try: - data = self.dev.read(EP2_ADDR, size, timeout) - return bytes(data) - except usb.core.USBTimeoutError: - return b'' - except usb.core.USBError as e: - if self.verbose: - print(f" EP2 read error: {e}") - return b'' - - # -- DiSEqC -- - - def send_diseqc_tone_burst(self, mini_cmd: int) -> None: - """Send tone burst (mini-DiSEqC). 0=SEC_MINI_A, 1=SEC_MINI_B.""" - self._vendor_out(CMD_SEND_DISEQC, value=mini_cmd) - - def send_diseqc_message(self, msg: bytes) -> None: - """Send full DiSEqC message (3-6 bytes). wValue = framing byte.""" - if len(msg) < 3 or len(msg) > 6: - raise ValueError(f"DiSEqC message must be 3-6 bytes, got {len(msg)}") - self._vendor_out(CMD_SEND_DISEQC, value=msg[0], data=msg) - - -# -- Signal bar rendering -- - -def signal_bar(pct: float, width: int = 40) -> str: - """Render a signal strength bar.""" - filled = int(pct / 100 * width) - filled = max(0, min(filled, width)) - bar = '#' * filled + '-' * (width - filled) - return f"[{bar}] {pct:.1f}%" - - -def format_config_bits(status: int) -> list: - """Return list of (bit_name, is_set) tuples for config byte.""" - result = [] - for bit, (name, _field) in CONFIG_BITS.items(): - result.append((name, bool(status & bit))) - return result +import usb.core # -- Subcommand handlers --