#!/usr/bin/env python3 """ Genpix SkyWalker-1 multi-mode RF tool. Modes: spectrum - Sweep spectrum analyzer (950-2150 MHz IF range) scan - Automated transponder scanner (sweep + blind scan) monitor - Real-time signal strength at a single frequency lband - L-band direct input analyzer (no LNB) track - Carrier/beacon tracker with logging """ import sys import os import argparse import time import signal import csv import json import struct import math from datetime import datetime # Add tools directory to path for library import sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from skywalker_lib import ( SkyWalker1, MODULATIONS, FEC_RATES, MOD_FEC_GROUP, LNB_LO_LOW, LNB_LO_HIGH, LBAND_ALLOCATIONS, CMD_BLIND_SCAN, snr_raw_to_db, snr_raw_to_pct, agc_to_power_db, detect_peaks, if_to_rf, signal_bar, format_config_bits, ) # --- Terminal rendering --- # ANSI color codes for waterfall display WATERFALL_COLORS = [ "\033[38;5;17m", # dark blue (weakest) "\033[38;5;19m", "\033[38;5;21m", "\033[38;5;27m", "\033[38;5;33m", "\033[38;5;39m", # cyan "\033[38;5;46m", # green "\033[38;5;82m", "\033[38;5;118m", "\033[38;5;154m", # yellow-green "\033[38;5;190m", "\033[38;5;226m", # yellow "\033[38;5;214m", "\033[38;5;208m", # orange "\033[38;5;196m", # red (strongest) "\033[38;5;160m", ] ANSI_RESET = "\033[0m" # Unicode block characters for bar charts BARS_H = " ▏▎▍▌▋▊▉█" # Sparkline characters SPARKS = "▁▂▃▄▅▆▇█" def power_color(power_db: float, floor: float = -40.0, ceil: float = 0.0) -> str: """Map a power_db value to an ANSI color escape.""" ratio = (power_db - floor) / (ceil - floor) ratio = max(0.0, min(1.0, ratio)) idx = int(ratio * (len(WATERFALL_COLORS) - 1)) return WATERFALL_COLORS[idx] def ascii_bar_h(value: float, max_val: float, width: int = 50) -> str: """Render a horizontal bar using Unicode block characters.""" if max_val == 0: return "" ratio = max(0.0, min(1.0, value / max_val)) full_blocks = int(ratio * width) remainder = (ratio * width) - full_blocks partial_idx = int(remainder * (len(BARS_H) - 1)) bar = "█" * full_blocks if full_blocks < width: bar += BARS_H[partial_idx] bar += " " * (width - full_blocks - 1) return bar def sparkline(values: list, width: int = 60) -> str: """Render a sparkline from a list of values.""" if not values: return "" # Take last 'width' values vals = values[-width:] mn = min(vals) mx = max(vals) rng = mx - mn if mx != mn else 1.0 return "".join( SPARKS[min(len(SPARKS) - 1, int((v - mn) / rng * (len(SPARKS) - 1)))] for v in vals ) def clear_line(): """Clear the current terminal line.""" sys.stdout.write("\r\033[K") # --- Mode: spectrum --- def cmd_spectrum(sw: SkyWalker1, args: argparse.Namespace) -> None: """Sweep 950-2150 MHz (or custom range), display power-vs-frequency.""" start = args.start stop = args.stop step = args.step dwell = args.dwell sr_ksps = args.sr lnb_lo = args.lnb_lo num_sweeps = args.sweeps steps = int((stop - start) / step) + 1 est_time = steps * (dwell + 2) / 1000.0 # dwell + USB overhead print(f"SkyWalker-1 Spectrum Analyzer") print(f"{'=' * 60}") print(f" IF Range: {start}-{stop} MHz (step {step} MHz)") if lnb_lo > 0: print(f" RF Range: {start + lnb_lo:.0f}-{stop + lnb_lo:.0f} MHz (LNB LO {lnb_lo} MHz)") else: print(f" Direct input (no LNB offset)") print(f" Steps: {steps}") print(f" Dwell: {dwell} ms") print(f" Symbol rate: {sr_ksps} ksps") print(f" Est. sweep: {est_time:.1f}s") if num_sweeps > 1: print(f" Sweeps: {num_sweeps}") print() sw.ensure_booted() csv_writer = None csv_file = None if args.csv: csv_file = open(args.csv, 'w', newline='') csv_writer = csv.writer(csv_file) csv_writer.writerow(["sweep", "if_mhz", "rf_mhz", "snr_raw", "snr_db", "agc1", "agc2", "power_db", "locked"]) all_sweeps = [] for sweep_num in range(num_sweeps): if num_sweeps > 1: print(f"\n--- Sweep {sweep_num + 1}/{num_sweeps} ---") def progress(freq, step_num, total, result): pct = (step_num + 1) / total * 100 rf = if_to_rf(freq, lnb_lo) clear_line() sys.stdout.write(f" [{pct:5.1f}%] {freq:.0f} MHz IF" f" ({rf:.0f} MHz RF)" f" SNR={result['snr_db']:.1f} dB" f" AGC={result['agc1']}") sys.stdout.flush() t0 = time.time() freqs, powers, results = sw.sweep_spectrum( start, stop, step, dwell, sr_ksps, callback=progress if not args.waterfall else None ) elapsed = time.time() - t0 clear_line() print(f" Sweep complete: {len(freqs)} points in {elapsed:.1f}s") all_sweeps.append((freqs, powers, results)) # Write CSV if csv_writer: for i, (f, p, r) in enumerate(zip(freqs, powers, results)): rf = if_to_rf(f, lnb_lo) csv_writer.writerow([ sweep_num, f"{f:.1f}", f"{rf:.1f}", r["snr_raw"], f"{r['snr_db']:.2f}", r["agc1"], r["agc2"], f"{p:.2f}", int(r["locked"]) ]) # Terminal bar chart display if not args.waterfall: print() p_min = min(powers) if powers else -40 p_max = max(powers) if powers else 0 p_range = p_max - p_min if p_max != p_min else 1 for i, (f, p) in enumerate(zip(freqs, powers)): rf = if_to_rf(f, lnb_lo) bar = ascii_bar_h(p - p_min, p_range, width=40) label = f"{rf:7.0f}" if lnb_lo > 0 else f"{f:7.0f}" locked = results[i]["locked"] lock_mark = " *" if locked else "" print(f" {label} |{bar}| {p:6.1f} dB{lock_mark}") # Waterfall display if args.waterfall: p_min = min(powers) if powers else -40 p_max = max(powers) if powers else 0 line = "" for p in powers: color = power_color(p, p_min, p_max) line += f"{color}█{ANSI_RESET}" ts = datetime.now().strftime("%H:%M:%S") print(f" {ts} {line}") # Peak detection if not args.waterfall and freqs: peaks = detect_peaks(freqs, powers, threshold_db=args.threshold if hasattr(args, 'threshold') else 3.0) if peaks: print(f"\n Peaks ({len(peaks)} found):") for freq, pwr, idx in peaks: rf = if_to_rf(freq, lnb_lo) locked = results[idx]["locked"] lock_str = " LOCKED" if locked else "" label = f"{rf:.0f} MHz RF" if lnb_lo > 0 else f"{freq:.0f} MHz" print(f" {label} {pwr:.1f} dB{lock_str}") if csv_file: csv_file.close() print(f"\n CSV saved: {args.csv}") # Matplotlib plot if args.plot: _plot_spectrum(freqs, powers, lnb_lo, all_sweeps) def _plot_spectrum(freqs, powers, lnb_lo, all_sweeps=None): """Show matplotlib spectrum plot.""" try: import matplotlib.pyplot as plt except ImportError: print(" matplotlib required for --plot: pip install matplotlib") return rf_freqs = [if_to_rf(f, lnb_lo) for f in freqs] x_label = "Frequency (MHz RF)" if lnb_lo > 0 else "Frequency (MHz IF)" fig, ax = plt.subplots(figsize=(14, 6)) ax.plot(rf_freqs, powers, '-', linewidth=0.8, color='#2196F3') ax.fill_between(rf_freqs, min(powers), powers, alpha=0.15, color='#2196F3') ax.set_xlabel(x_label) ax.set_ylabel("Power (dB, relative)") ax.set_title("SkyWalker-1 Spectrum") ax.grid(True, alpha=0.3) # Mark peaks peaks = detect_peaks(freqs, powers) if peaks: peak_x = [if_to_rf(f, lnb_lo) for f, _, _ in peaks] peak_y = [p for _, p, _ in peaks] ax.scatter(peak_x, peak_y, color='#F44336', zorder=5, s=30) for px, py in zip(peak_x, peak_y): ax.annotate(f"{px:.0f}", (px, py), textcoords="offset points", xytext=(0, 8), ha='center', fontsize=7, color='#F44336') plt.tight_layout() plt.show() # --- Mode: scan --- def cmd_scan(sw: SkyWalker1, args: argparse.Namespace) -> None: """Automated transponder scanner: coarse sweep, peak detect, blind scan.""" start = args.start stop = args.stop lnb_lo = args.lnb_lo threshold = args.threshold sr_min = args.sr_min * 1000 sr_max = args.sr_max * 1000 sr_step = args.sr_step * 1000 print(f"SkyWalker-1 Transponder Scanner") print(f"{'=' * 60}") print(f" IF Range: {start}-{stop} MHz") if lnb_lo > 0: print(f" RF Range: {start + lnb_lo:.0f}-{stop + lnb_lo:.0f} MHz") print(f" Threshold: {threshold} dB above noise floor") print(f" SR Range: {args.sr_min}-{args.sr_max} ksps (step {args.sr_step})") print() sw.ensure_booted() # Configure LNB if args.pol: sw.set_lnb_voltage(args.pol.upper() in ("H", "L")) if args.band: sw.set_22khz_tone(args.band == "high") # Phase 1: Coarse spectrum sweep print("[Phase 1] Coarse spectrum sweep...") coarse_step = 10 # MHz freqs, powers, results = sw.sweep_spectrum( start, stop, coarse_step, dwell_ms=15, sr_ksps=20000 ) print(f" {len(freqs)} points measured") # Phase 2: Find peaks print(f"\n[Phase 2] Peak detection (threshold {threshold} dB)...") peaks = detect_peaks(freqs, powers, threshold_db=threshold) if not peaks: print(" No peaks found above threshold.") print(" Try lowering --threshold or checking dish alignment.") return print(f" {len(peaks)} candidate peaks:") for freq, pwr, idx in peaks: rf = if_to_rf(freq, lnb_lo) print(f" {rf:.0f} MHz {pwr:.1f} dB") # Phase 2.5: Fine sweep around each peak print(f"\n[Phase 2.5] Fine sweep around peaks...") refined_peaks = [] for freq, pwr, idx in peaks: fine_start = max(start, freq - 15) fine_stop = min(stop, freq + 15) fine_freqs, fine_powers, fine_results = sw.sweep_spectrum( fine_start, fine_stop, step_mhz=2.0, dwell_ms=20, sr_ksps=20000 ) if fine_powers: best_idx = fine_powers.index(max(fine_powers)) refined_peaks.append(( fine_freqs[best_idx], fine_powers[best_idx], fine_results[best_idx] )) rf = if_to_rf(fine_freqs[best_idx], lnb_lo) print(f" {rf:.0f} MHz {fine_powers[best_idx]:.1f} dB (refined)") # Phase 3: Blind scan at each refined peak print(f"\n[Phase 3] Blind scan at {len(refined_peaks)} peaks...") found = [] for freq, pwr, result in refined_peaks: freq_khz_int = int(freq * 1000) rf = if_to_rf(freq, lnb_lo) print(f" Scanning {rf:.0f} MHz (IF {freq:.0f})...", end="", flush=True) # Build blind scan EP0 payload payload = struct.pack('= 8 and resp[0] != 0: found_freq = struct.unpack_from(' None: """Real-time signal monitor / dish alignment aid.""" freq_mhz = args.freq sr_ksps = args.sr lnb_lo = args.lnb_lo rate = args.rate poll_interval = 1.0 / rate # Calculate IF frequency if lnb_lo > 0: if_mhz = freq_mhz - lnb_lo else: if_mhz = freq_mhz if_khz = int(if_mhz * 1000) sr_sps = sr_ksps * 1000 print(f"SkyWalker-1 Signal Monitor") print(f"{'=' * 60}") if lnb_lo > 0: print(f" Frequency: {freq_mhz} MHz (IF {if_mhz:.0f} MHz, LNB LO {lnb_lo} MHz)") else: print(f" Frequency: {if_mhz} MHz (direct input)") print(f" Symbol rate: {sr_ksps} ksps") print(f" Poll rate: {rate} Hz") if args.audio: print(f" Audio: ON") print(f"\n Press Ctrl-C to stop\n") sw.ensure_booted() # Configure LNB if args.pol: sw.set_lnb_voltage(args.pol.upper() in ("H", "L")) if args.band: sw.set_22khz_tone(args.band == "high") # Initial tune sw.tune(sr_sps, if_khz, 0, 5) # QPSK, auto-FEC time.sleep(0.5) history = [] peak_snr = 0.0 peak_power = -99.0 running = True def stop(signum, frame): nonlocal running running = False signal.signal(signal.SIGINT, stop) signal.signal(signal.SIGTERM, stop) while running: t0 = time.time() sig = sw.signal_monitor() snr_db = sig["snr_db"] power = sig["power_db"] locked = sig["locked"] agc1 = sig["agc1"] history.append(snr_db) if args.peak_hold: peak_snr = max(peak_snr, snr_db) peak_power = max(peak_power, power) # Build display lock_str = "LOCK" if locked else "----" bar = signal_bar(sig["snr_pct"], width=35) clear_line() line = f" [{lock_str}] SNR {snr_db:5.1f} dB AGC {agc1:5d} {bar}" if args.peak_hold: line += f" peak {peak_snr:.1f} dB" sys.stdout.write(line) # Sparkline history if len(history) > 1: spark = sparkline(history, width=min(40, len(history))) sys.stdout.write(f"\n History: {spark}") sys.stdout.write("\033[F") # cursor up sys.stdout.flush() # Audio feedback if args.audio: _beep_proportional(snr_db) # Pace the polling elapsed = time.time() - t0 sleep_time = poll_interval - elapsed if sleep_time > 0: time.sleep(sleep_time) print(f"\n\n Stopped. {len(history)} samples collected.") if args.peak_hold: print(f" Peak SNR: {peak_snr:.1f} dB") if args.plot: _plot_monitor_history(history, rate) def _beep_proportional(snr_db: float): """Emit a pitch-proportional beep for dish alignment.""" # Map SNR 0-15 dB to frequency 200-2000 Hz freq = int(200 + min(15, max(0, snr_db)) / 15 * 1800) duration_ms = 50 try: sys.stdout.write(f"\033[10;{freq}]\033[11;{duration_ms}]\a") sys.stdout.flush() except Exception: pass # Terminal doesn't support bell frequency control def _plot_monitor_history(history, rate): """Plot signal monitor history with matplotlib.""" try: import matplotlib.pyplot as plt except ImportError: print(" matplotlib required for --plot: pip install matplotlib") return t = [i / rate for i in range(len(history))] fig, ax = plt.subplots(figsize=(12, 5)) ax.plot(t, history, '-', linewidth=0.8, color='#4CAF50') ax.set_xlabel("Time (s)") ax.set_ylabel("SNR (dB)") ax.set_title("Signal Monitor History") ax.grid(True, alpha=0.3) plt.tight_layout() plt.show() # --- Mode: lband --- def cmd_lband(sw: SkyWalker1, args: argparse.Namespace) -> None: """L-band direct input spectrum analyzer with allocation annotations.""" start = args.start stop = args.stop step = args.step dwell = args.dwell # Narrow to 23cm if requested if args.ham_23cm: start = 1240 stop = 1300 step = 0.5 steps = int((stop - start) / step) + 1 est_time = steps * (dwell + 2) / 1000.0 print(f"SkyWalker-1 L-Band Analyzer") print(f"{'=' * 60}") print(f" Range: {start}-{stop} MHz (direct input, no LNB)") print(f" Steps: {steps} (step {step} MHz, dwell {dwell} ms)") print(f" Est. sweep: {est_time:.1f}s") print() print(" NOTE: Can detect carrier PRESENCE at any frequency even if") print(" modulation is incompatible with the BCM4500 demodulator.") print() sw.ensure_booted() # Disable LNB power for direct input sw.start_intersil(on=False) time.sleep(0.1) # Show band info if args.band_info: print(f" L-Band Allocations in range:") for lo, hi, name in LBAND_ALLOCATIONS: if lo < stop and hi > start: overlap_lo = max(lo, start) overlap_hi = min(hi, stop) print(f" {overlap_lo:7.0f}-{overlap_hi:<7.0f} MHz {name}") print() # Sweep def progress(freq, step_num, total, result): pct = (step_num + 1) / total * 100 clear_line() sys.stdout.write(f" [{pct:5.1f}%] {freq:.1f} MHz" f" SNR={result['snr_db']:.1f} dB" f" AGC={result['agc1']}") sys.stdout.flush() t0 = time.time() freqs, powers, results = sw.sweep_spectrum( start, stop, step, dwell, sr_ksps=20000, callback=progress if not args.waterfall else None ) elapsed = time.time() - t0 clear_line() print(f" Sweep complete: {len(freqs)} points in {elapsed:.1f}s\n") # CSV output csv_writer = None csv_file = None if args.csv: csv_file = open(args.csv, 'w', newline='') csv_writer = csv.writer(csv_file) csv_writer.writerow(["freq_mhz", "snr_raw", "snr_db", "agc1", "agc2", "power_db", "locked", "allocation"]) for i, (f, p, r) in enumerate(zip(freqs, powers, results)): alloc = _freq_allocation(f) csv_writer.writerow([ f"{f:.1f}", r["snr_raw"], f"{r['snr_db']:.2f}", r["agc1"], r["agc2"], f"{p:.2f}", int(r["locked"]), alloc ]) csv_file.close() print(f" CSV saved: {args.csv}") # Terminal display with allocation annotations if not args.waterfall: p_min = min(powers) if powers else -40 p_max = max(powers) if powers else 0 p_range = p_max - p_min if p_max != p_min else 1 last_alloc = "" for i, (f, p) in enumerate(zip(freqs, powers)): bar = ascii_bar_h(p - p_min, p_range, width=35) locked = results[i]["locked"] lock_mark = " *" if locked else "" alloc = _freq_allocation(f) # Print allocation header when it changes if alloc != last_alloc and alloc: print(f" {'─' * 55}") print(f" ┌ {alloc}") last_alloc = alloc elif alloc != last_alloc: last_alloc = alloc print(f" {f:7.1f} |{bar}| {p:6.1f} dB{lock_mark}") # Waterfall if args.waterfall: p_min = min(powers) if powers else -40 p_max = max(powers) if powers else 0 line = "" for p in powers: color = power_color(p, p_min, p_max) line += f"{color}█{ANSI_RESET}" ts = datetime.now().strftime("%H:%M:%S") print(f" {ts} {line}") # Peaks peaks = detect_peaks(freqs, powers, threshold_db=3.0) if peaks: print(f"\n Peaks ({len(peaks)} found):") for freq, pwr, idx in peaks: alloc = _freq_allocation(freq) alloc_str = f" [{alloc}]" if alloc else "" locked = results[idx]["locked"] lock_str = " LOCKED" if locked else "" print(f" {freq:.1f} MHz {pwr:.1f} dB{lock_str}{alloc_str}") if args.plot: _plot_lband(freqs, powers) def _freq_allocation(freq_mhz: float) -> str: """Return the allocation name for a given frequency, or empty string.""" for lo, hi, name in LBAND_ALLOCATIONS: if lo <= freq_mhz <= hi: return name return "" def _plot_lband(freqs, powers): """L-band spectrum plot with allocation shading.""" try: import matplotlib.pyplot as plt except ImportError: print(" matplotlib required for --plot: pip install matplotlib") return fig, ax = plt.subplots(figsize=(14, 6)) ax.plot(freqs, powers, '-', linewidth=0.8, color='#FF9800') ax.fill_between(freqs, min(powers), powers, alpha=0.1, color='#FF9800') # Shade allocations colors = ['#E3F2FD', '#FFF3E0', '#E8F5E9', '#F3E5F5', '#FBE9E7', '#E0F7FA', '#ECEFF1'] for i, (lo, hi, name) in enumerate(LBAND_ALLOCATIONS): if lo < max(freqs) and hi > min(freqs): c = colors[i % len(colors)] ax.axvspan(lo, hi, alpha=0.3, color=c, label=name) mid = (lo + hi) / 2 ax.text(mid, max(powers) * 0.95, name, ha='center', fontsize=6, rotation=45) ax.set_xlabel("Frequency (MHz)") ax.set_ylabel("Power (dB, relative)") ax.set_title("SkyWalker-1 L-Band Spectrum") ax.grid(True, alpha=0.3) plt.tight_layout() plt.show() # --- Mode: track --- def cmd_track(sw: SkyWalker1, args: argparse.Namespace) -> None: """Lock to a frequency and log power/lock/status over time.""" freq_mhz = args.freq sr_ksps = args.sr lnb_lo = args.lnb_lo rate = args.rate duration = args.duration poll_interval = 1.0 / rate if lnb_lo > 0: if_mhz = freq_mhz - lnb_lo else: if_mhz = freq_mhz if_khz = int(if_mhz * 1000) sr_sps = sr_ksps * 1000 print(f"SkyWalker-1 Carrier Tracker") print(f"{'=' * 60}") if lnb_lo > 0: print(f" Frequency: {freq_mhz} MHz (IF {if_mhz:.0f} MHz)") else: print(f" Frequency: {if_mhz} MHz (direct)") print(f" Symbol rate: {sr_ksps} ksps") print(f" Poll rate: {rate} Hz") if duration: print(f" Duration: {duration}s") if args.log: print(f" Log file: {args.log}") print(f"\n Press Ctrl-C to stop\n") sw.ensure_booted() if args.pol: sw.set_lnb_voltage(args.pol.upper() in ("H", "L")) if args.band: sw.set_22khz_tone(args.band == "high") # Tune once sw.tune(sr_sps, if_khz, 0, 5) time.sleep(0.5) # Setup logging log_file = None log_writer = None if args.log: log_file = open(args.log, 'w', newline='') log_writer = csv.writer(log_file) log_writer.writerow(["timestamp", "elapsed_s", "snr_db", "agc1", "agc2", "power_db", "locked", "lock_reg", "status_reg"]) jsonl_file = None if args.json_lines: jsonl_file = open(args.json_lines, 'w') history_snr = [] history_power = [] was_locked = None sample_count = 0 start_time = time.time() running = True def stop_handler(signum, frame): nonlocal running running = False signal.signal(signal.SIGINT, stop_handler) signal.signal(signal.SIGTERM, stop_handler) # Drift tracking window drift_window = 5 # MHz each side drift_history = [] while running: if duration and (time.time() - start_time) >= duration: break t0 = time.time() elapsed = t0 - start_time sig = sw.signal_monitor() snr_db = sig["snr_db"] power = sig["power_db"] locked = sig["locked"] history_snr.append(snr_db) history_power.append(power) sample_count += 1 # Lock state transitions if was_locked is not None and locked != was_locked: ts = datetime.now().strftime("%H:%M:%S.%f")[:-3] if locked: print(f"\n [{ts}] >>> LOCK ACQUIRED SNR {snr_db:.1f} dB") else: print(f"\n [{ts}] <<< LOCK LOST") was_locked = locked # Drift tracking if args.drift_track and sample_count % (rate * 5) == 0: # Every 5 seconds, do a narrow sweep to find peak drift_start = max(950, if_mhz - drift_window) drift_stop = min(2150, if_mhz + drift_window) drift_freqs, drift_powers, _ = sw.sweep_spectrum( drift_start, drift_stop, step_mhz=1.0, dwell_ms=5, sr_ksps=sr_ksps ) if drift_powers: peak_idx = drift_powers.index(max(drift_powers)) peak_freq = drift_freqs[peak_idx] drift_hz = (peak_freq - if_mhz) * 1000 drift_history.append((elapsed, drift_hz)) if abs(drift_hz) > 100: print(f"\n DRIFT: {drift_hz:+.0f} kHz from center") # Re-tune to original sw.tune(sr_sps, if_khz, 0, 5) time.sleep(0.1) # Display lock_str = "LOCK" if locked else "----" bar = signal_bar(sig["snr_pct"], width=30) clear_line() sys.stdout.write(f" [{lock_str}] {elapsed:7.1f}s SNR {snr_db:5.1f} dB {bar}" f" #{sample_count}") sys.stdout.flush() # Log ts_iso = datetime.now().isoformat() if log_writer: log_writer.writerow([ ts_iso, f"{elapsed:.3f}", f"{snr_db:.2f}", sig["agc1"], sig["agc2"], f"{power:.2f}", int(locked), sig["lock"], sig["status"] ]) if jsonl_file: record = { "ts": ts_iso, "elapsed": round(elapsed, 3), "snr_db": round(snr_db, 2), "agc1": sig["agc1"], "agc2": sig["agc2"], "power_db": round(power, 2), "locked": locked, } jsonl_file.write(json.dumps(record) + "\n") # Pace sleep_time = poll_interval - (time.time() - t0) if sleep_time > 0: time.sleep(sleep_time) total_time = time.time() - start_time print(f"\n\n Stopped. {sample_count} samples in {total_time:.1f}s") if log_file: log_file.close() print(f" Log saved: {args.log}") if jsonl_file: jsonl_file.close() print(f" JSON-lines saved: {args.json_lines}") if args.plot: _plot_track(history_snr, history_power, rate, drift_history) def _plot_track(history_snr, history_power, rate, drift_history=None): """Plot tracking history.""" try: import matplotlib.pyplot as plt except ImportError: print(" matplotlib required for --plot: pip install matplotlib") return t = [i / rate for i in range(len(history_snr))] fig, axes = plt.subplots(2, 1, figsize=(14, 8), sharex=True) axes[0].plot(t, history_snr, '-', linewidth=0.6, color='#4CAF50') axes[0].set_ylabel("SNR (dB)") axes[0].set_title("Carrier Tracking") axes[0].grid(True, alpha=0.3) axes[1].plot(t, history_power, '-', linewidth=0.6, color='#2196F3') axes[1].set_ylabel("Power (dB)") axes[1].set_xlabel("Time (s)") axes[1].grid(True, alpha=0.3) if drift_history: ax_drift = axes[0].twinx() dt = [d[0] for d in drift_history] dv = [d[1] for d in drift_history] ax_drift.plot(dt, dv, 'o-', color='#F44336', markersize=3, linewidth=0.8) ax_drift.set_ylabel("Drift (kHz)", color='#F44336') plt.tight_layout() plt.show() # --- CLI --- def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( prog="skywalker.py", description="Genpix SkyWalker-1 multi-mode RF tool", formatter_class=argparse.RawDescriptionHelpFormatter, epilog="""\ examples: %(prog)s spectrum --start 950 --stop 2150 --step 5 %(prog)s spectrum --lnb-lo 9750 --start 950 --stop 2150 --plot %(prog)s scan --lnb-lo 9750 --pol H --band high %(prog)s monitor 12520 27500 --lnb-lo 9750 --pol H --rate 10 %(prog)s lband --band-info %(prog)s lband --23cm --plot %(prog)s track 12520 27500 --lnb-lo 9750 --log signal.csv --duration 60 RF coverage (with different LNB configurations): No LNB (direct): 950-2150 MHz (L-band: ham 23cm, GPS, GOES, HRPT) Ku LNB (9750 LO): 10700-11900 MHz (satellite TV low band) Ku LNB (10600 LO): 11550-12750 MHz (satellite TV high band) Custom (9000 LO): 9950-11150 MHz (QO-100 DATV @ ~1491 MHz IF) """) parser.add_argument('-v', '--verbose', action='store_true', help="Show raw USB traffic") sub = parser.add_subparsers(dest='command') # spectrum p_spec = sub.add_parser('spectrum', help="Sweep spectrum analyzer", formatter_class=argparse.RawDescriptionHelpFormatter) p_spec.add_argument('--start', type=float, default=950, help="Start IF frequency in MHz (default: 950)") p_spec.add_argument('--stop', type=float, default=2150, help="Stop IF frequency in MHz (default: 2150)") p_spec.add_argument('--step', type=float, default=5, help="Step size in MHz (default: 5)") p_spec.add_argument('--dwell', type=int, default=10, help="Dwell time per step in ms (default: 10)") p_spec.add_argument('--lnb-lo', type=float, default=0, help="LNB LO frequency in MHz (0=direct input)") p_spec.add_argument('--sr', type=int, default=20000, help="Symbol rate for measurement in ksps (default: 20000)") p_spec.add_argument('--waterfall', action='store_true', help="Waterfall display (time x frequency x power)") p_spec.add_argument('--sweeps', type=int, default=1, help="Number of sweeps (default: 1)") p_spec.add_argument('--threshold', type=float, default=3.0, help="Peak detection threshold in dB (default: 3)") p_spec.add_argument('--plot', action='store_true', help="Show matplotlib plot") p_spec.add_argument('--csv', metavar='FILE', help="Save sweep data to CSV") # scan p_scan = sub.add_parser('scan', help="Automated transponder scanner", formatter_class=argparse.RawDescriptionHelpFormatter) p_scan.add_argument('--start', type=float, default=950, help="Start IF frequency in MHz (default: 950)") p_scan.add_argument('--stop', type=float, default=2150, help="Stop IF frequency in MHz (default: 2150)") p_scan.add_argument('--threshold', type=float, default=3, help="Peak detection threshold in dB (default: 3)") p_scan.add_argument('--sr-min', type=int, default=1000, help="Minimum symbol rate in ksps (default: 1000)") p_scan.add_argument('--sr-max', type=int, default=30000, help="Maximum symbol rate in ksps (default: 30000)") p_scan.add_argument('--sr-step', type=int, default=500, help="Symbol rate step in ksps (default: 500)") p_scan.add_argument('--lnb-lo', type=float, default=9750, help="LNB LO frequency in MHz (default: 9750)") p_scan.add_argument('--pol', choices=['H', 'V', 'L', 'R'], help="Polarization (sets LNB voltage)") p_scan.add_argument('--band', choices=['low', 'high'], help="LNB band (sets 22 kHz tone)") p_scan.add_argument('--json', action='store_true', help="Output results as JSON") p_scan.add_argument('--csv', metavar='FILE', help="Save results to CSV") # monitor p_mon = sub.add_parser('monitor', help="Real-time signal monitor / dish alignment", formatter_class=argparse.RawDescriptionHelpFormatter) p_mon.add_argument('freq', type=float, help="Frequency in MHz (RF if --lnb-lo set, IF otherwise)") p_mon.add_argument('sr', type=int, help="Symbol rate in ksps") p_mon.add_argument('--pol', choices=['H', 'V', 'L', 'R'], help="Polarization (sets LNB voltage)") p_mon.add_argument('--band', choices=['low', 'high'], help="LNB band (sets 22 kHz tone)") p_mon.add_argument('--lnb-lo', type=float, default=0, help="LNB LO frequency in MHz (0=direct input)") p_mon.add_argument('--rate', type=float, default=10, help="Poll rate in Hz (default: 10, max ~50)") p_mon.add_argument('--audio', action='store_true', help="Pitch-proportional beep for hands-free alignment") p_mon.add_argument('--peak-hold', action='store_true', help="Track and display maximum signal seen") p_mon.add_argument('--history', type=int, default=60, help="Sparkline history length in samples (default: 60)") p_mon.add_argument('--plot', action='store_true', help="Show matplotlib plot after stopping") # lband p_lband = sub.add_parser('lband', help="L-band direct input analyzer", formatter_class=argparse.RawDescriptionHelpFormatter, epilog="""\ L-band mode uses direct input (no LNB) to monitor 950-2150 MHz. Can detect carrier PRESENCE at any frequency even if modulation is incompatible with the BCM4500 demodulator. Known allocations in range: 1240-1300 MHz Amateur 23cm 1525-1559 MHz Inmarsat downlink 1559-1610 MHz GNSS (GPS L1, Galileo E1) 1610-1626 MHz Iridium downlink 1670-1710 MHz MetSat (GOES LRIT, NOAA HRPT) 1710-1785 MHz LTE/AWS uplink 1920-2025 MHz UMTS uplink """) p_lband.add_argument('--start', type=float, default=950, help="Start frequency in MHz (default: 950)") p_lband.add_argument('--stop', type=float, default=2150, help="Stop frequency in MHz (default: 2150)") p_lband.add_argument('--step', type=float, default=2, help="Step size in MHz (default: 2)") p_lband.add_argument('--dwell', type=int, default=20, help="Dwell time per step in ms (default: 20)") p_lband.add_argument('--23cm', dest='ham_23cm', action='store_true', help="Narrow to 1240-1300 MHz with 500 kHz steps") p_lband.add_argument('--band-info', action='store_true', help="Print L-band allocation table") p_lband.add_argument('--waterfall', action='store_true', help="Waterfall display") p_lband.add_argument('--plot', action='store_true', help="Show matplotlib plot") p_lband.add_argument('--csv', metavar='FILE', help="Save sweep data to CSV") # track p_track = sub.add_parser('track', help="Carrier/beacon tracker with logging", formatter_class=argparse.RawDescriptionHelpFormatter) p_track.add_argument('freq', type=float, help="Frequency in MHz (RF if --lnb-lo set, IF otherwise)") p_track.add_argument('sr', type=int, help="Symbol rate in ksps") p_track.add_argument('--pol', choices=['H', 'V', 'L', 'R'], help="Polarization (sets LNB voltage)") p_track.add_argument('--band', choices=['low', 'high'], help="LNB band (sets 22 kHz tone)") p_track.add_argument('--lnb-lo', type=float, default=0, help="LNB LO frequency in MHz (0=direct input)") p_track.add_argument('--rate', type=float, default=1, help="Poll rate in Hz (default: 1)") p_track.add_argument('--duration', type=float, default=None, help="Tracking duration in seconds (default: until Ctrl-C)") p_track.add_argument('--log', metavar='FILE', help="Log CSV: timestamp, snr, agc, power, lock, status") p_track.add_argument('--drift-track', action='store_true', help="Periodically sweep narrow window to measure frequency drift") p_track.add_argument('--plot', action='store_true', help="Show matplotlib plot after stopping") p_track.add_argument('--json-lines', metavar='FILE', help="Log as JSON-lines (one JSON object per line)") return parser def main(): parser = build_parser() args = parser.parse_args() if not args.command: parser.print_help() sys.exit(0) dispatch = { 'spectrum': cmd_spectrum, 'scan': cmd_scan, 'monitor': cmd_monitor, 'lband': cmd_lband, 'track': cmd_track, } handler = dispatch.get(args.command) if handler is None: parser.print_help() sys.exit(1) with SkyWalker1(verbose=args.verbose) as sw: handler(sw, args) if __name__ == '__main__': main()