Apply .gitattributes normalization to convert all CRLF line endings inherited from Windows-origin source files to Unix LF. 175 files, zero content changes.
1087 lines
38 KiB
Python
Executable File
1087 lines
38 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Genpix SkyWalker-1 multi-mode RF tool.
|
|
|
|
Modes:
|
|
spectrum - Sweep spectrum analyzer (950-2150 MHz IF range)
|
|
scan - Automated transponder scanner (sweep + blind scan)
|
|
monitor - Real-time signal strength at a single frequency
|
|
lband - L-band direct input analyzer (no LNB)
|
|
track - Carrier/beacon tracker with logging
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import argparse
|
|
import time
|
|
import signal
|
|
import csv
|
|
import json
|
|
import struct
|
|
import math
|
|
from datetime import datetime
|
|
|
|
# Add tools directory to path for library import
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
from skywalker_lib import (
|
|
SkyWalker1, MODULATIONS, FEC_RATES, MOD_FEC_GROUP,
|
|
LNB_LO_LOW, LNB_LO_HIGH, LBAND_ALLOCATIONS,
|
|
CMD_BLIND_SCAN,
|
|
snr_raw_to_db, snr_raw_to_pct, agc_to_power_db,
|
|
detect_peaks, if_to_rf, signal_bar, format_config_bits,
|
|
)
|
|
|
|
|
|
# --- Terminal rendering ---
|
|
|
|
# ANSI color codes for waterfall display
|
|
WATERFALL_COLORS = [
|
|
"\033[38;5;17m", # dark blue (weakest)
|
|
"\033[38;5;19m",
|
|
"\033[38;5;21m",
|
|
"\033[38;5;27m",
|
|
"\033[38;5;33m",
|
|
"\033[38;5;39m", # cyan
|
|
"\033[38;5;46m", # green
|
|
"\033[38;5;82m",
|
|
"\033[38;5;118m",
|
|
"\033[38;5;154m", # yellow-green
|
|
"\033[38;5;190m",
|
|
"\033[38;5;226m", # yellow
|
|
"\033[38;5;214m",
|
|
"\033[38;5;208m", # orange
|
|
"\033[38;5;196m", # red (strongest)
|
|
"\033[38;5;160m",
|
|
]
|
|
ANSI_RESET = "\033[0m"
|
|
|
|
# Unicode block characters for bar charts
|
|
BARS_H = " ▏▎▍▌▋▊▉█"
|
|
|
|
# Sparkline characters
|
|
SPARKS = "▁▂▃▄▅▆▇█"
|
|
|
|
|
|
def power_color(power_db: float, floor: float = -40.0, ceil: float = 0.0) -> str:
|
|
"""Map a power_db value to an ANSI color escape."""
|
|
ratio = (power_db - floor) / (ceil - floor)
|
|
ratio = max(0.0, min(1.0, ratio))
|
|
idx = int(ratio * (len(WATERFALL_COLORS) - 1))
|
|
return WATERFALL_COLORS[idx]
|
|
|
|
|
|
def ascii_bar_h(value: float, max_val: float, width: int = 50) -> str:
|
|
"""Render a horizontal bar using Unicode block characters."""
|
|
if max_val == 0:
|
|
return ""
|
|
ratio = max(0.0, min(1.0, value / max_val))
|
|
full_blocks = int(ratio * width)
|
|
remainder = (ratio * width) - full_blocks
|
|
partial_idx = int(remainder * (len(BARS_H) - 1))
|
|
|
|
bar = "█" * full_blocks
|
|
if full_blocks < width:
|
|
bar += BARS_H[partial_idx]
|
|
bar += " " * (width - full_blocks - 1)
|
|
return bar
|
|
|
|
|
|
def sparkline(values: list, width: int = 60) -> str:
|
|
"""Render a sparkline from a list of values."""
|
|
if not values:
|
|
return ""
|
|
# Take last 'width' values
|
|
vals = values[-width:]
|
|
mn = min(vals)
|
|
mx = max(vals)
|
|
rng = mx - mn if mx != mn else 1.0
|
|
return "".join(
|
|
SPARKS[min(len(SPARKS) - 1, int((v - mn) / rng * (len(SPARKS) - 1)))]
|
|
for v in vals
|
|
)
|
|
|
|
|
|
def clear_line():
|
|
"""Clear the current terminal line."""
|
|
sys.stdout.write("\r\033[K")
|
|
|
|
|
|
# --- Mode: spectrum ---
|
|
|
|
def cmd_spectrum(sw: SkyWalker1, args: argparse.Namespace) -> None:
|
|
"""Sweep 950-2150 MHz (or custom range), display power-vs-frequency."""
|
|
start = args.start
|
|
stop = args.stop
|
|
step = args.step
|
|
dwell = args.dwell
|
|
sr_ksps = args.sr
|
|
lnb_lo = args.lnb_lo
|
|
num_sweeps = args.sweeps
|
|
|
|
steps = int((stop - start) / step) + 1
|
|
est_time = steps * (dwell + 2) / 1000.0 # dwell + USB overhead
|
|
|
|
print(f"SkyWalker-1 Spectrum Analyzer")
|
|
print(f"{'=' * 60}")
|
|
print(f" IF Range: {start}-{stop} MHz (step {step} MHz)")
|
|
if lnb_lo > 0:
|
|
print(f" RF Range: {start + lnb_lo:.0f}-{stop + lnb_lo:.0f} MHz (LNB LO {lnb_lo} MHz)")
|
|
else:
|
|
print(f" Direct input (no LNB offset)")
|
|
print(f" Steps: {steps}")
|
|
print(f" Dwell: {dwell} ms")
|
|
print(f" Symbol rate: {sr_ksps} ksps")
|
|
print(f" Est. sweep: {est_time:.1f}s")
|
|
if num_sweeps > 1:
|
|
print(f" Sweeps: {num_sweeps}")
|
|
print()
|
|
|
|
sw.ensure_booted()
|
|
|
|
csv_writer = None
|
|
csv_file = None
|
|
if args.csv:
|
|
csv_file = open(args.csv, 'w', newline='')
|
|
csv_writer = csv.writer(csv_file)
|
|
csv_writer.writerow(["sweep", "if_mhz", "rf_mhz", "snr_raw", "snr_db",
|
|
"agc1", "agc2", "power_db", "locked"])
|
|
|
|
all_sweeps = []
|
|
|
|
for sweep_num in range(num_sweeps):
|
|
if num_sweeps > 1:
|
|
print(f"\n--- Sweep {sweep_num + 1}/{num_sweeps} ---")
|
|
|
|
def progress(freq, step_num, total, result):
|
|
pct = (step_num + 1) / total * 100
|
|
rf = if_to_rf(freq, lnb_lo)
|
|
clear_line()
|
|
sys.stdout.write(f" [{pct:5.1f}%] {freq:.0f} MHz IF"
|
|
f" ({rf:.0f} MHz RF)"
|
|
f" SNR={result['snr_db']:.1f} dB"
|
|
f" AGC={result['agc1']}")
|
|
sys.stdout.flush()
|
|
|
|
t0 = time.time()
|
|
freqs, powers, results = sw.sweep_spectrum(
|
|
start, stop, step, dwell, sr_ksps,
|
|
callback=progress if not args.waterfall else None
|
|
)
|
|
elapsed = time.time() - t0
|
|
clear_line()
|
|
print(f" Sweep complete: {len(freqs)} points in {elapsed:.1f}s")
|
|
|
|
all_sweeps.append((freqs, powers, results))
|
|
|
|
# Write CSV
|
|
if csv_writer:
|
|
for i, (f, p, r) in enumerate(zip(freqs, powers, results)):
|
|
rf = if_to_rf(f, lnb_lo)
|
|
csv_writer.writerow([
|
|
sweep_num, f"{f:.1f}", f"{rf:.1f}",
|
|
r["snr_raw"], f"{r['snr_db']:.2f}",
|
|
r["agc1"], r["agc2"],
|
|
f"{p:.2f}", int(r["locked"])
|
|
])
|
|
|
|
# Terminal bar chart display
|
|
if not args.waterfall:
|
|
print()
|
|
p_min = min(powers) if powers else -40
|
|
p_max = max(powers) if powers else 0
|
|
p_range = p_max - p_min if p_max != p_min else 1
|
|
|
|
for i, (f, p) in enumerate(zip(freqs, powers)):
|
|
rf = if_to_rf(f, lnb_lo)
|
|
bar = ascii_bar_h(p - p_min, p_range, width=40)
|
|
label = f"{rf:7.0f}" if lnb_lo > 0 else f"{f:7.0f}"
|
|
locked = results[i]["locked"]
|
|
lock_mark = " *" if locked else ""
|
|
print(f" {label} |{bar}| {p:6.1f} dB{lock_mark}")
|
|
|
|
# Waterfall display
|
|
if args.waterfall:
|
|
p_min = min(powers) if powers else -40
|
|
p_max = max(powers) if powers else 0
|
|
line = ""
|
|
for p in powers:
|
|
color = power_color(p, p_min, p_max)
|
|
line += f"{color}█{ANSI_RESET}"
|
|
ts = datetime.now().strftime("%H:%M:%S")
|
|
print(f" {ts} {line}")
|
|
|
|
# Peak detection
|
|
if not args.waterfall and freqs:
|
|
peaks = detect_peaks(freqs, powers, threshold_db=args.threshold if hasattr(args, 'threshold') else 3.0)
|
|
if peaks:
|
|
print(f"\n Peaks ({len(peaks)} found):")
|
|
for freq, pwr, idx in peaks:
|
|
rf = if_to_rf(freq, lnb_lo)
|
|
locked = results[idx]["locked"]
|
|
lock_str = " LOCKED" if locked else ""
|
|
label = f"{rf:.0f} MHz RF" if lnb_lo > 0 else f"{freq:.0f} MHz"
|
|
print(f" {label} {pwr:.1f} dB{lock_str}")
|
|
|
|
if csv_file:
|
|
csv_file.close()
|
|
print(f"\n CSV saved: {args.csv}")
|
|
|
|
# Matplotlib plot
|
|
if args.plot:
|
|
_plot_spectrum(freqs, powers, lnb_lo, all_sweeps)
|
|
|
|
|
|
def _plot_spectrum(freqs, powers, lnb_lo, all_sweeps=None):
|
|
"""Show matplotlib spectrum plot."""
|
|
try:
|
|
import matplotlib.pyplot as plt
|
|
except ImportError:
|
|
print(" matplotlib required for --plot: pip install matplotlib")
|
|
return
|
|
|
|
rf_freqs = [if_to_rf(f, lnb_lo) for f in freqs]
|
|
x_label = "Frequency (MHz RF)" if lnb_lo > 0 else "Frequency (MHz IF)"
|
|
|
|
fig, ax = plt.subplots(figsize=(14, 6))
|
|
ax.plot(rf_freqs, powers, '-', linewidth=0.8, color='#2196F3')
|
|
ax.fill_between(rf_freqs, min(powers), powers, alpha=0.15, color='#2196F3')
|
|
ax.set_xlabel(x_label)
|
|
ax.set_ylabel("Power (dB, relative)")
|
|
ax.set_title("SkyWalker-1 Spectrum")
|
|
ax.grid(True, alpha=0.3)
|
|
|
|
# Mark peaks
|
|
peaks = detect_peaks(freqs, powers)
|
|
if peaks:
|
|
peak_x = [if_to_rf(f, lnb_lo) for f, _, _ in peaks]
|
|
peak_y = [p for _, p, _ in peaks]
|
|
ax.scatter(peak_x, peak_y, color='#F44336', zorder=5, s=30)
|
|
for px, py in zip(peak_x, peak_y):
|
|
ax.annotate(f"{px:.0f}", (px, py), textcoords="offset points",
|
|
xytext=(0, 8), ha='center', fontsize=7, color='#F44336')
|
|
|
|
plt.tight_layout()
|
|
plt.show()
|
|
|
|
|
|
# --- Mode: scan ---
|
|
|
|
def cmd_scan(sw: SkyWalker1, args: argparse.Namespace) -> None:
|
|
"""Automated transponder scanner: coarse sweep, peak detect, blind scan."""
|
|
start = args.start
|
|
stop = args.stop
|
|
lnb_lo = args.lnb_lo
|
|
threshold = args.threshold
|
|
sr_min = args.sr_min * 1000
|
|
sr_max = args.sr_max * 1000
|
|
sr_step = args.sr_step * 1000
|
|
|
|
print(f"SkyWalker-1 Transponder Scanner")
|
|
print(f"{'=' * 60}")
|
|
print(f" IF Range: {start}-{stop} MHz")
|
|
if lnb_lo > 0:
|
|
print(f" RF Range: {start + lnb_lo:.0f}-{stop + lnb_lo:.0f} MHz")
|
|
print(f" Threshold: {threshold} dB above noise floor")
|
|
print(f" SR Range: {args.sr_min}-{args.sr_max} ksps (step {args.sr_step})")
|
|
print()
|
|
|
|
sw.ensure_booted()
|
|
|
|
# Configure LNB
|
|
if args.pol:
|
|
sw.set_lnb_voltage(args.pol.upper() in ("H", "L"))
|
|
if args.band:
|
|
sw.set_22khz_tone(args.band == "high")
|
|
|
|
# Phase 1: Coarse spectrum sweep
|
|
print("[Phase 1] Coarse spectrum sweep...")
|
|
coarse_step = 10 # MHz
|
|
freqs, powers, results = sw.sweep_spectrum(
|
|
start, stop, coarse_step, dwell_ms=15, sr_ksps=20000
|
|
)
|
|
print(f" {len(freqs)} points measured")
|
|
|
|
# Phase 2: Find peaks
|
|
print(f"\n[Phase 2] Peak detection (threshold {threshold} dB)...")
|
|
peaks = detect_peaks(freqs, powers, threshold_db=threshold)
|
|
if not peaks:
|
|
print(" No peaks found above threshold.")
|
|
print(" Try lowering --threshold or checking dish alignment.")
|
|
return
|
|
|
|
print(f" {len(peaks)} candidate peaks:")
|
|
for freq, pwr, idx in peaks:
|
|
rf = if_to_rf(freq, lnb_lo)
|
|
print(f" {rf:.0f} MHz {pwr:.1f} dB")
|
|
|
|
# Phase 2.5: Fine sweep around each peak
|
|
print(f"\n[Phase 2.5] Fine sweep around peaks...")
|
|
refined_peaks = []
|
|
for freq, pwr, idx in peaks:
|
|
fine_start = max(start, freq - 15)
|
|
fine_stop = min(stop, freq + 15)
|
|
fine_freqs, fine_powers, fine_results = sw.sweep_spectrum(
|
|
fine_start, fine_stop, step_mhz=2.0, dwell_ms=20, sr_ksps=20000
|
|
)
|
|
if fine_powers:
|
|
best_idx = fine_powers.index(max(fine_powers))
|
|
refined_peaks.append((
|
|
fine_freqs[best_idx],
|
|
fine_powers[best_idx],
|
|
fine_results[best_idx]
|
|
))
|
|
rf = if_to_rf(fine_freqs[best_idx], lnb_lo)
|
|
print(f" {rf:.0f} MHz {fine_powers[best_idx]:.1f} dB (refined)")
|
|
|
|
# Phase 3: Blind scan at each refined peak
|
|
print(f"\n[Phase 3] Blind scan at {len(refined_peaks)} peaks...")
|
|
found = []
|
|
|
|
for freq, pwr, result in refined_peaks:
|
|
freq_khz_int = int(freq * 1000)
|
|
rf = if_to_rf(freq, lnb_lo)
|
|
print(f" Scanning {rf:.0f} MHz (IF {freq:.0f})...", end="", flush=True)
|
|
|
|
# Build blind scan EP0 payload
|
|
payload = struct.pack('<IIII', freq_khz_int, sr_min, sr_max, sr_step)
|
|
sw._vendor_out(CMD_BLIND_SCAN, data=payload)
|
|
|
|
# Read result
|
|
try:
|
|
resp = sw._vendor_in(CMD_BLIND_SCAN, length=8)
|
|
if len(resp) >= 8 and resp[0] != 0:
|
|
found_freq = struct.unpack_from('<I', resp, 0)[0]
|
|
found_sr = struct.unpack_from('<I', resp, 4)[0]
|
|
print(f" LOCKED SR={found_sr // 1000} ksps")
|
|
found.append({
|
|
"if_mhz": found_freq / 1000.0,
|
|
"rf_mhz": if_to_rf(found_freq / 1000.0, lnb_lo),
|
|
"sr_ksps": found_sr // 1000,
|
|
"sr_sps": found_sr,
|
|
"power_db": pwr,
|
|
})
|
|
else:
|
|
print(" no lock")
|
|
except Exception:
|
|
print(" error")
|
|
|
|
# Report
|
|
print(f"\n{'=' * 60}")
|
|
print(f"Scan complete: {len(found)} transponder(s) found\n")
|
|
for tp in found:
|
|
print(f" {tp['rf_mhz']:.0f} MHz SR {tp['sr_ksps']} ksps {tp['power_db']:.1f} dB")
|
|
|
|
if args.json:
|
|
print(f"\n{json.dumps(found, indent=2)}")
|
|
|
|
if args.csv:
|
|
with open(args.csv, 'w', newline='') as f:
|
|
w = csv.writer(f)
|
|
w.writerow(["if_mhz", "rf_mhz", "sr_ksps", "power_db"])
|
|
for tp in found:
|
|
w.writerow([tp["if_mhz"], tp["rf_mhz"], tp["sr_ksps"], tp["power_db"]])
|
|
print(f" CSV saved: {args.csv}")
|
|
|
|
|
|
# --- Mode: monitor ---
|
|
|
|
def cmd_monitor(sw: SkyWalker1, args: argparse.Namespace) -> None:
|
|
"""Real-time signal monitor / dish alignment aid."""
|
|
freq_mhz = args.freq
|
|
sr_ksps = args.sr
|
|
lnb_lo = args.lnb_lo
|
|
rate = args.rate
|
|
poll_interval = 1.0 / rate
|
|
|
|
# Calculate IF frequency
|
|
if lnb_lo > 0:
|
|
if_mhz = freq_mhz - lnb_lo
|
|
else:
|
|
if_mhz = freq_mhz
|
|
|
|
if_khz = int(if_mhz * 1000)
|
|
sr_sps = sr_ksps * 1000
|
|
|
|
print(f"SkyWalker-1 Signal Monitor")
|
|
print(f"{'=' * 60}")
|
|
if lnb_lo > 0:
|
|
print(f" Frequency: {freq_mhz} MHz (IF {if_mhz:.0f} MHz, LNB LO {lnb_lo} MHz)")
|
|
else:
|
|
print(f" Frequency: {if_mhz} MHz (direct input)")
|
|
print(f" Symbol rate: {sr_ksps} ksps")
|
|
print(f" Poll rate: {rate} Hz")
|
|
if args.audio:
|
|
print(f" Audio: ON")
|
|
print(f"\n Press Ctrl-C to stop\n")
|
|
|
|
sw.ensure_booted()
|
|
|
|
# Configure LNB
|
|
if args.pol:
|
|
sw.set_lnb_voltage(args.pol.upper() in ("H", "L"))
|
|
if args.band:
|
|
sw.set_22khz_tone(args.band == "high")
|
|
|
|
# Initial tune
|
|
sw.tune(sr_sps, if_khz, 0, 5) # QPSK, auto-FEC
|
|
time.sleep(0.5)
|
|
|
|
history = []
|
|
peak_snr = 0.0
|
|
peak_power = -99.0
|
|
running = True
|
|
|
|
def stop(signum, frame):
|
|
nonlocal running
|
|
running = False
|
|
|
|
signal.signal(signal.SIGINT, stop)
|
|
signal.signal(signal.SIGTERM, stop)
|
|
|
|
while running:
|
|
t0 = time.time()
|
|
|
|
sig = sw.signal_monitor()
|
|
snr_db = sig["snr_db"]
|
|
power = sig["power_db"]
|
|
locked = sig["locked"]
|
|
agc1 = sig["agc1"]
|
|
|
|
history.append(snr_db)
|
|
if args.peak_hold:
|
|
peak_snr = max(peak_snr, snr_db)
|
|
peak_power = max(peak_power, power)
|
|
|
|
# Build display
|
|
lock_str = "LOCK" if locked else "----"
|
|
bar = signal_bar(sig["snr_pct"], width=35)
|
|
|
|
clear_line()
|
|
line = f" [{lock_str}] SNR {snr_db:5.1f} dB AGC {agc1:5d} {bar}"
|
|
if args.peak_hold:
|
|
line += f" peak {peak_snr:.1f} dB"
|
|
sys.stdout.write(line)
|
|
|
|
# Sparkline history
|
|
if len(history) > 1:
|
|
spark = sparkline(history, width=min(40, len(history)))
|
|
sys.stdout.write(f"\n History: {spark}")
|
|
sys.stdout.write("\033[F") # cursor up
|
|
|
|
sys.stdout.flush()
|
|
|
|
# Audio feedback
|
|
if args.audio:
|
|
_beep_proportional(snr_db)
|
|
|
|
# Pace the polling
|
|
elapsed = time.time() - t0
|
|
sleep_time = poll_interval - elapsed
|
|
if sleep_time > 0:
|
|
time.sleep(sleep_time)
|
|
|
|
print(f"\n\n Stopped. {len(history)} samples collected.")
|
|
if args.peak_hold:
|
|
print(f" Peak SNR: {peak_snr:.1f} dB")
|
|
|
|
if args.plot:
|
|
_plot_monitor_history(history, rate)
|
|
|
|
|
|
def _beep_proportional(snr_db: float):
|
|
"""Emit a pitch-proportional beep for dish alignment."""
|
|
# Map SNR 0-15 dB to frequency 200-2000 Hz
|
|
freq = int(200 + min(15, max(0, snr_db)) / 15 * 1800)
|
|
duration_ms = 50
|
|
try:
|
|
sys.stdout.write(f"\033[10;{freq}]\033[11;{duration_ms}]\a")
|
|
sys.stdout.flush()
|
|
except Exception:
|
|
pass # Terminal doesn't support bell frequency control
|
|
|
|
|
|
def _plot_monitor_history(history, rate):
|
|
"""Plot signal monitor history with matplotlib."""
|
|
try:
|
|
import matplotlib.pyplot as plt
|
|
except ImportError:
|
|
print(" matplotlib required for --plot: pip install matplotlib")
|
|
return
|
|
|
|
t = [i / rate for i in range(len(history))]
|
|
fig, ax = plt.subplots(figsize=(12, 5))
|
|
ax.plot(t, history, '-', linewidth=0.8, color='#4CAF50')
|
|
ax.set_xlabel("Time (s)")
|
|
ax.set_ylabel("SNR (dB)")
|
|
ax.set_title("Signal Monitor History")
|
|
ax.grid(True, alpha=0.3)
|
|
plt.tight_layout()
|
|
plt.show()
|
|
|
|
|
|
# --- Mode: lband ---
|
|
|
|
def cmd_lband(sw: SkyWalker1, args: argparse.Namespace) -> None:
|
|
"""L-band direct input spectrum analyzer with allocation annotations."""
|
|
start = args.start
|
|
stop = args.stop
|
|
step = args.step
|
|
dwell = args.dwell
|
|
|
|
# Narrow to 23cm if requested
|
|
if args.ham_23cm:
|
|
start = 1240
|
|
stop = 1300
|
|
step = 0.5
|
|
|
|
steps = int((stop - start) / step) + 1
|
|
est_time = steps * (dwell + 2) / 1000.0
|
|
|
|
print(f"SkyWalker-1 L-Band Analyzer")
|
|
print(f"{'=' * 60}")
|
|
print(f" Range: {start}-{stop} MHz (direct input, no LNB)")
|
|
print(f" Steps: {steps} (step {step} MHz, dwell {dwell} ms)")
|
|
print(f" Est. sweep: {est_time:.1f}s")
|
|
print()
|
|
print(" NOTE: Can detect carrier PRESENCE at any frequency even if")
|
|
print(" modulation is incompatible with the BCM4500 demodulator.")
|
|
print()
|
|
|
|
sw.ensure_booted()
|
|
|
|
# Disable LNB power for direct input
|
|
sw.start_intersil(on=False)
|
|
time.sleep(0.1)
|
|
|
|
# Show band info
|
|
if args.band_info:
|
|
print(f" L-Band Allocations in range:")
|
|
for lo, hi, name in LBAND_ALLOCATIONS:
|
|
if lo < stop and hi > start:
|
|
overlap_lo = max(lo, start)
|
|
overlap_hi = min(hi, stop)
|
|
print(f" {overlap_lo:7.0f}-{overlap_hi:<7.0f} MHz {name}")
|
|
print()
|
|
|
|
# Sweep
|
|
def progress(freq, step_num, total, result):
|
|
pct = (step_num + 1) / total * 100
|
|
clear_line()
|
|
sys.stdout.write(f" [{pct:5.1f}%] {freq:.1f} MHz"
|
|
f" SNR={result['snr_db']:.1f} dB"
|
|
f" AGC={result['agc1']}")
|
|
sys.stdout.flush()
|
|
|
|
t0 = time.time()
|
|
freqs, powers, results = sw.sweep_spectrum(
|
|
start, stop, step, dwell, sr_ksps=20000,
|
|
callback=progress if not args.waterfall else None
|
|
)
|
|
elapsed = time.time() - t0
|
|
clear_line()
|
|
print(f" Sweep complete: {len(freqs)} points in {elapsed:.1f}s\n")
|
|
|
|
# CSV output
|
|
csv_writer = None
|
|
csv_file = None
|
|
if args.csv:
|
|
csv_file = open(args.csv, 'w', newline='')
|
|
csv_writer = csv.writer(csv_file)
|
|
csv_writer.writerow(["freq_mhz", "snr_raw", "snr_db", "agc1", "agc2",
|
|
"power_db", "locked", "allocation"])
|
|
for i, (f, p, r) in enumerate(zip(freqs, powers, results)):
|
|
alloc = _freq_allocation(f)
|
|
csv_writer.writerow([
|
|
f"{f:.1f}", r["snr_raw"], f"{r['snr_db']:.2f}",
|
|
r["agc1"], r["agc2"], f"{p:.2f}", int(r["locked"]),
|
|
alloc
|
|
])
|
|
csv_file.close()
|
|
print(f" CSV saved: {args.csv}")
|
|
|
|
# Terminal display with allocation annotations
|
|
if not args.waterfall:
|
|
p_min = min(powers) if powers else -40
|
|
p_max = max(powers) if powers else 0
|
|
p_range = p_max - p_min if p_max != p_min else 1
|
|
last_alloc = ""
|
|
|
|
for i, (f, p) in enumerate(zip(freqs, powers)):
|
|
bar = ascii_bar_h(p - p_min, p_range, width=35)
|
|
locked = results[i]["locked"]
|
|
lock_mark = " *" if locked else ""
|
|
alloc = _freq_allocation(f)
|
|
|
|
# Print allocation header when it changes
|
|
if alloc != last_alloc and alloc:
|
|
print(f" {'─' * 55}")
|
|
print(f" ┌ {alloc}")
|
|
last_alloc = alloc
|
|
elif alloc != last_alloc:
|
|
last_alloc = alloc
|
|
|
|
print(f" {f:7.1f} |{bar}| {p:6.1f} dB{lock_mark}")
|
|
|
|
# Waterfall
|
|
if args.waterfall:
|
|
p_min = min(powers) if powers else -40
|
|
p_max = max(powers) if powers else 0
|
|
line = ""
|
|
for p in powers:
|
|
color = power_color(p, p_min, p_max)
|
|
line += f"{color}█{ANSI_RESET}"
|
|
ts = datetime.now().strftime("%H:%M:%S")
|
|
print(f" {ts} {line}")
|
|
|
|
# Peaks
|
|
peaks = detect_peaks(freqs, powers, threshold_db=3.0)
|
|
if peaks:
|
|
print(f"\n Peaks ({len(peaks)} found):")
|
|
for freq, pwr, idx in peaks:
|
|
alloc = _freq_allocation(freq)
|
|
alloc_str = f" [{alloc}]" if alloc else ""
|
|
locked = results[idx]["locked"]
|
|
lock_str = " LOCKED" if locked else ""
|
|
print(f" {freq:.1f} MHz {pwr:.1f} dB{lock_str}{alloc_str}")
|
|
|
|
if args.plot:
|
|
_plot_lband(freqs, powers)
|
|
|
|
|
|
def _freq_allocation(freq_mhz: float) -> str:
|
|
"""Return the allocation name for a given frequency, or empty string."""
|
|
for lo, hi, name in LBAND_ALLOCATIONS:
|
|
if lo <= freq_mhz <= hi:
|
|
return name
|
|
return ""
|
|
|
|
|
|
def _plot_lband(freqs, powers):
|
|
"""L-band spectrum plot with allocation shading."""
|
|
try:
|
|
import matplotlib.pyplot as plt
|
|
except ImportError:
|
|
print(" matplotlib required for --plot: pip install matplotlib")
|
|
return
|
|
|
|
fig, ax = plt.subplots(figsize=(14, 6))
|
|
ax.plot(freqs, powers, '-', linewidth=0.8, color='#FF9800')
|
|
ax.fill_between(freqs, min(powers), powers, alpha=0.1, color='#FF9800')
|
|
|
|
# Shade allocations
|
|
colors = ['#E3F2FD', '#FFF3E0', '#E8F5E9', '#F3E5F5',
|
|
'#FBE9E7', '#E0F7FA', '#ECEFF1']
|
|
for i, (lo, hi, name) in enumerate(LBAND_ALLOCATIONS):
|
|
if lo < max(freqs) and hi > min(freqs):
|
|
c = colors[i % len(colors)]
|
|
ax.axvspan(lo, hi, alpha=0.3, color=c, label=name)
|
|
mid = (lo + hi) / 2
|
|
ax.text(mid, max(powers) * 0.95, name,
|
|
ha='center', fontsize=6, rotation=45)
|
|
|
|
ax.set_xlabel("Frequency (MHz)")
|
|
ax.set_ylabel("Power (dB, relative)")
|
|
ax.set_title("SkyWalker-1 L-Band Spectrum")
|
|
ax.grid(True, alpha=0.3)
|
|
plt.tight_layout()
|
|
plt.show()
|
|
|
|
|
|
# --- Mode: track ---
|
|
|
|
def cmd_track(sw: SkyWalker1, args: argparse.Namespace) -> None:
|
|
"""Lock to a frequency and log power/lock/status over time."""
|
|
freq_mhz = args.freq
|
|
sr_ksps = args.sr
|
|
lnb_lo = args.lnb_lo
|
|
rate = args.rate
|
|
duration = args.duration
|
|
poll_interval = 1.0 / rate
|
|
|
|
if lnb_lo > 0:
|
|
if_mhz = freq_mhz - lnb_lo
|
|
else:
|
|
if_mhz = freq_mhz
|
|
|
|
if_khz = int(if_mhz * 1000)
|
|
sr_sps = sr_ksps * 1000
|
|
|
|
print(f"SkyWalker-1 Carrier Tracker")
|
|
print(f"{'=' * 60}")
|
|
if lnb_lo > 0:
|
|
print(f" Frequency: {freq_mhz} MHz (IF {if_mhz:.0f} MHz)")
|
|
else:
|
|
print(f" Frequency: {if_mhz} MHz (direct)")
|
|
print(f" Symbol rate: {sr_ksps} ksps")
|
|
print(f" Poll rate: {rate} Hz")
|
|
if duration:
|
|
print(f" Duration: {duration}s")
|
|
if args.log:
|
|
print(f" Log file: {args.log}")
|
|
print(f"\n Press Ctrl-C to stop\n")
|
|
|
|
sw.ensure_booted()
|
|
|
|
if args.pol:
|
|
sw.set_lnb_voltage(args.pol.upper() in ("H", "L"))
|
|
if args.band:
|
|
sw.set_22khz_tone(args.band == "high")
|
|
|
|
# Tune once
|
|
sw.tune(sr_sps, if_khz, 0, 5)
|
|
time.sleep(0.5)
|
|
|
|
# Setup logging
|
|
log_file = None
|
|
log_writer = None
|
|
if args.log:
|
|
log_file = open(args.log, 'w', newline='')
|
|
log_writer = csv.writer(log_file)
|
|
log_writer.writerow(["timestamp", "elapsed_s", "snr_db", "agc1", "agc2",
|
|
"power_db", "locked", "lock_reg", "status_reg"])
|
|
|
|
jsonl_file = None
|
|
if args.json_lines:
|
|
jsonl_file = open(args.json_lines, 'w')
|
|
|
|
history_snr = []
|
|
history_power = []
|
|
was_locked = None
|
|
sample_count = 0
|
|
start_time = time.time()
|
|
running = True
|
|
|
|
def stop_handler(signum, frame):
|
|
nonlocal running
|
|
running = False
|
|
|
|
signal.signal(signal.SIGINT, stop_handler)
|
|
signal.signal(signal.SIGTERM, stop_handler)
|
|
|
|
# Drift tracking window
|
|
drift_window = 5 # MHz each side
|
|
drift_history = []
|
|
|
|
while running:
|
|
if duration and (time.time() - start_time) >= duration:
|
|
break
|
|
|
|
t0 = time.time()
|
|
elapsed = t0 - start_time
|
|
|
|
sig = sw.signal_monitor()
|
|
snr_db = sig["snr_db"]
|
|
power = sig["power_db"]
|
|
locked = sig["locked"]
|
|
|
|
history_snr.append(snr_db)
|
|
history_power.append(power)
|
|
sample_count += 1
|
|
|
|
# Lock state transitions
|
|
if was_locked is not None and locked != was_locked:
|
|
ts = datetime.now().strftime("%H:%M:%S.%f")[:-3]
|
|
if locked:
|
|
print(f"\n [{ts}] >>> LOCK ACQUIRED SNR {snr_db:.1f} dB")
|
|
else:
|
|
print(f"\n [{ts}] <<< LOCK LOST")
|
|
was_locked = locked
|
|
|
|
# Drift tracking
|
|
if args.drift_track and sample_count % (rate * 5) == 0:
|
|
# Every 5 seconds, do a narrow sweep to find peak
|
|
drift_start = max(950, if_mhz - drift_window)
|
|
drift_stop = min(2150, if_mhz + drift_window)
|
|
drift_freqs, drift_powers, _ = sw.sweep_spectrum(
|
|
drift_start, drift_stop, step_mhz=1.0, dwell_ms=5, sr_ksps=sr_ksps
|
|
)
|
|
if drift_powers:
|
|
peak_idx = drift_powers.index(max(drift_powers))
|
|
peak_freq = drift_freqs[peak_idx]
|
|
drift_hz = (peak_freq - if_mhz) * 1000
|
|
drift_history.append((elapsed, drift_hz))
|
|
if abs(drift_hz) > 100:
|
|
print(f"\n DRIFT: {drift_hz:+.0f} kHz from center")
|
|
# Re-tune to original
|
|
sw.tune(sr_sps, if_khz, 0, 5)
|
|
time.sleep(0.1)
|
|
|
|
# Display
|
|
lock_str = "LOCK" if locked else "----"
|
|
bar = signal_bar(sig["snr_pct"], width=30)
|
|
clear_line()
|
|
sys.stdout.write(f" [{lock_str}] {elapsed:7.1f}s SNR {snr_db:5.1f} dB {bar}"
|
|
f" #{sample_count}")
|
|
sys.stdout.flush()
|
|
|
|
# Log
|
|
ts_iso = datetime.now().isoformat()
|
|
if log_writer:
|
|
log_writer.writerow([
|
|
ts_iso, f"{elapsed:.3f}", f"{snr_db:.2f}",
|
|
sig["agc1"], sig["agc2"], f"{power:.2f}",
|
|
int(locked), sig["lock"], sig["status"]
|
|
])
|
|
|
|
if jsonl_file:
|
|
record = {
|
|
"ts": ts_iso,
|
|
"elapsed": round(elapsed, 3),
|
|
"snr_db": round(snr_db, 2),
|
|
"agc1": sig["agc1"],
|
|
"agc2": sig["agc2"],
|
|
"power_db": round(power, 2),
|
|
"locked": locked,
|
|
}
|
|
jsonl_file.write(json.dumps(record) + "\n")
|
|
|
|
# Pace
|
|
sleep_time = poll_interval - (time.time() - t0)
|
|
if sleep_time > 0:
|
|
time.sleep(sleep_time)
|
|
|
|
total_time = time.time() - start_time
|
|
print(f"\n\n Stopped. {sample_count} samples in {total_time:.1f}s")
|
|
|
|
if log_file:
|
|
log_file.close()
|
|
print(f" Log saved: {args.log}")
|
|
if jsonl_file:
|
|
jsonl_file.close()
|
|
print(f" JSON-lines saved: {args.json_lines}")
|
|
|
|
if args.plot:
|
|
_plot_track(history_snr, history_power, rate, drift_history)
|
|
|
|
|
|
def _plot_track(history_snr, history_power, rate, drift_history=None):
|
|
"""Plot tracking history."""
|
|
try:
|
|
import matplotlib.pyplot as plt
|
|
except ImportError:
|
|
print(" matplotlib required for --plot: pip install matplotlib")
|
|
return
|
|
|
|
t = [i / rate for i in range(len(history_snr))]
|
|
|
|
fig, axes = plt.subplots(2, 1, figsize=(14, 8), sharex=True)
|
|
|
|
axes[0].plot(t, history_snr, '-', linewidth=0.6, color='#4CAF50')
|
|
axes[0].set_ylabel("SNR (dB)")
|
|
axes[0].set_title("Carrier Tracking")
|
|
axes[0].grid(True, alpha=0.3)
|
|
|
|
axes[1].plot(t, history_power, '-', linewidth=0.6, color='#2196F3')
|
|
axes[1].set_ylabel("Power (dB)")
|
|
axes[1].set_xlabel("Time (s)")
|
|
axes[1].grid(True, alpha=0.3)
|
|
|
|
if drift_history:
|
|
ax_drift = axes[0].twinx()
|
|
dt = [d[0] for d in drift_history]
|
|
dv = [d[1] for d in drift_history]
|
|
ax_drift.plot(dt, dv, 'o-', color='#F44336', markersize=3, linewidth=0.8)
|
|
ax_drift.set_ylabel("Drift (kHz)", color='#F44336')
|
|
|
|
plt.tight_layout()
|
|
plt.show()
|
|
|
|
|
|
# --- CLI ---
|
|
|
|
def build_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(
|
|
prog="skywalker.py",
|
|
description="Genpix SkyWalker-1 multi-mode RF tool",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""\
|
|
examples:
|
|
%(prog)s spectrum --start 950 --stop 2150 --step 5
|
|
%(prog)s spectrum --lnb-lo 9750 --start 950 --stop 2150 --plot
|
|
%(prog)s scan --lnb-lo 9750 --pol H --band high
|
|
%(prog)s monitor 12520 27500 --lnb-lo 9750 --pol H --rate 10
|
|
%(prog)s lband --band-info
|
|
%(prog)s lband --23cm --plot
|
|
%(prog)s track 12520 27500 --lnb-lo 9750 --log signal.csv --duration 60
|
|
|
|
RF coverage (with different LNB configurations):
|
|
No LNB (direct): 950-2150 MHz (L-band: ham 23cm, GPS, GOES, HRPT)
|
|
Ku LNB (9750 LO): 10700-11900 MHz (satellite TV low band)
|
|
Ku LNB (10600 LO): 11550-12750 MHz (satellite TV high band)
|
|
Custom (9000 LO): 9950-11150 MHz (QO-100 DATV @ ~1491 MHz IF)
|
|
""")
|
|
parser.add_argument('-v', '--verbose', action='store_true',
|
|
help="Show raw USB traffic")
|
|
|
|
sub = parser.add_subparsers(dest='command')
|
|
|
|
# spectrum
|
|
p_spec = sub.add_parser('spectrum', help="Sweep spectrum analyzer",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter)
|
|
p_spec.add_argument('--start', type=float, default=950,
|
|
help="Start IF frequency in MHz (default: 950)")
|
|
p_spec.add_argument('--stop', type=float, default=2150,
|
|
help="Stop IF frequency in MHz (default: 2150)")
|
|
p_spec.add_argument('--step', type=float, default=5,
|
|
help="Step size in MHz (default: 5)")
|
|
p_spec.add_argument('--dwell', type=int, default=10,
|
|
help="Dwell time per step in ms (default: 10)")
|
|
p_spec.add_argument('--lnb-lo', type=float, default=0,
|
|
help="LNB LO frequency in MHz (0=direct input)")
|
|
p_spec.add_argument('--sr', type=int, default=20000,
|
|
help="Symbol rate for measurement in ksps (default: 20000)")
|
|
p_spec.add_argument('--waterfall', action='store_true',
|
|
help="Waterfall display (time x frequency x power)")
|
|
p_spec.add_argument('--sweeps', type=int, default=1,
|
|
help="Number of sweeps (default: 1)")
|
|
p_spec.add_argument('--threshold', type=float, default=3.0,
|
|
help="Peak detection threshold in dB (default: 3)")
|
|
p_spec.add_argument('--plot', action='store_true',
|
|
help="Show matplotlib plot")
|
|
p_spec.add_argument('--csv', metavar='FILE',
|
|
help="Save sweep data to CSV")
|
|
|
|
# scan
|
|
p_scan = sub.add_parser('scan', help="Automated transponder scanner",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter)
|
|
p_scan.add_argument('--start', type=float, default=950,
|
|
help="Start IF frequency in MHz (default: 950)")
|
|
p_scan.add_argument('--stop', type=float, default=2150,
|
|
help="Stop IF frequency in MHz (default: 2150)")
|
|
p_scan.add_argument('--threshold', type=float, default=3,
|
|
help="Peak detection threshold in dB (default: 3)")
|
|
p_scan.add_argument('--sr-min', type=int, default=1000,
|
|
help="Minimum symbol rate in ksps (default: 1000)")
|
|
p_scan.add_argument('--sr-max', type=int, default=30000,
|
|
help="Maximum symbol rate in ksps (default: 30000)")
|
|
p_scan.add_argument('--sr-step', type=int, default=500,
|
|
help="Symbol rate step in ksps (default: 500)")
|
|
p_scan.add_argument('--lnb-lo', type=float, default=9750,
|
|
help="LNB LO frequency in MHz (default: 9750)")
|
|
p_scan.add_argument('--pol', choices=['H', 'V', 'L', 'R'],
|
|
help="Polarization (sets LNB voltage)")
|
|
p_scan.add_argument('--band', choices=['low', 'high'],
|
|
help="LNB band (sets 22 kHz tone)")
|
|
p_scan.add_argument('--json', action='store_true',
|
|
help="Output results as JSON")
|
|
p_scan.add_argument('--csv', metavar='FILE',
|
|
help="Save results to CSV")
|
|
|
|
# monitor
|
|
p_mon = sub.add_parser('monitor', help="Real-time signal monitor / dish alignment",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter)
|
|
p_mon.add_argument('freq', type=float,
|
|
help="Frequency in MHz (RF if --lnb-lo set, IF otherwise)")
|
|
p_mon.add_argument('sr', type=int,
|
|
help="Symbol rate in ksps")
|
|
p_mon.add_argument('--pol', choices=['H', 'V', 'L', 'R'],
|
|
help="Polarization (sets LNB voltage)")
|
|
p_mon.add_argument('--band', choices=['low', 'high'],
|
|
help="LNB band (sets 22 kHz tone)")
|
|
p_mon.add_argument('--lnb-lo', type=float, default=0,
|
|
help="LNB LO frequency in MHz (0=direct input)")
|
|
p_mon.add_argument('--rate', type=float, default=10,
|
|
help="Poll rate in Hz (default: 10, max ~50)")
|
|
p_mon.add_argument('--audio', action='store_true',
|
|
help="Pitch-proportional beep for hands-free alignment")
|
|
p_mon.add_argument('--peak-hold', action='store_true',
|
|
help="Track and display maximum signal seen")
|
|
p_mon.add_argument('--history', type=int, default=60,
|
|
help="Sparkline history length in samples (default: 60)")
|
|
p_mon.add_argument('--plot', action='store_true',
|
|
help="Show matplotlib plot after stopping")
|
|
|
|
# lband
|
|
p_lband = sub.add_parser('lband', help="L-band direct input analyzer",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""\
|
|
L-band mode uses direct input (no LNB) to monitor 950-2150 MHz.
|
|
Can detect carrier PRESENCE at any frequency even if modulation
|
|
is incompatible with the BCM4500 demodulator.
|
|
|
|
Known allocations in range:
|
|
1240-1300 MHz Amateur 23cm
|
|
1525-1559 MHz Inmarsat downlink
|
|
1559-1610 MHz GNSS (GPS L1, Galileo E1)
|
|
1610-1626 MHz Iridium downlink
|
|
1670-1710 MHz MetSat (GOES LRIT, NOAA HRPT)
|
|
1710-1785 MHz LTE/AWS uplink
|
|
1920-2025 MHz UMTS uplink
|
|
""")
|
|
p_lband.add_argument('--start', type=float, default=950,
|
|
help="Start frequency in MHz (default: 950)")
|
|
p_lband.add_argument('--stop', type=float, default=2150,
|
|
help="Stop frequency in MHz (default: 2150)")
|
|
p_lband.add_argument('--step', type=float, default=2,
|
|
help="Step size in MHz (default: 2)")
|
|
p_lband.add_argument('--dwell', type=int, default=20,
|
|
help="Dwell time per step in ms (default: 20)")
|
|
p_lband.add_argument('--23cm', dest='ham_23cm', action='store_true',
|
|
help="Narrow to 1240-1300 MHz with 500 kHz steps")
|
|
p_lband.add_argument('--band-info', action='store_true',
|
|
help="Print L-band allocation table")
|
|
p_lband.add_argument('--waterfall', action='store_true',
|
|
help="Waterfall display")
|
|
p_lband.add_argument('--plot', action='store_true',
|
|
help="Show matplotlib plot")
|
|
p_lband.add_argument('--csv', metavar='FILE',
|
|
help="Save sweep data to CSV")
|
|
|
|
# track
|
|
p_track = sub.add_parser('track', help="Carrier/beacon tracker with logging",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter)
|
|
p_track.add_argument('freq', type=float,
|
|
help="Frequency in MHz (RF if --lnb-lo set, IF otherwise)")
|
|
p_track.add_argument('sr', type=int,
|
|
help="Symbol rate in ksps")
|
|
p_track.add_argument('--pol', choices=['H', 'V', 'L', 'R'],
|
|
help="Polarization (sets LNB voltage)")
|
|
p_track.add_argument('--band', choices=['low', 'high'],
|
|
help="LNB band (sets 22 kHz tone)")
|
|
p_track.add_argument('--lnb-lo', type=float, default=0,
|
|
help="LNB LO frequency in MHz (0=direct input)")
|
|
p_track.add_argument('--rate', type=float, default=1,
|
|
help="Poll rate in Hz (default: 1)")
|
|
p_track.add_argument('--duration', type=float, default=None,
|
|
help="Tracking duration in seconds (default: until Ctrl-C)")
|
|
p_track.add_argument('--log', metavar='FILE',
|
|
help="Log CSV: timestamp, snr, agc, power, lock, status")
|
|
p_track.add_argument('--drift-track', action='store_true',
|
|
help="Periodically sweep narrow window to measure frequency drift")
|
|
p_track.add_argument('--plot', action='store_true',
|
|
help="Show matplotlib plot after stopping")
|
|
p_track.add_argument('--json-lines', metavar='FILE',
|
|
help="Log as JSON-lines (one JSON object per line)")
|
|
|
|
return parser
|
|
|
|
|
|
def main():
|
|
parser = build_parser()
|
|
args = parser.parse_args()
|
|
|
|
if not args.command:
|
|
parser.print_help()
|
|
sys.exit(0)
|
|
|
|
dispatch = {
|
|
'spectrum': cmd_spectrum,
|
|
'scan': cmd_scan,
|
|
'monitor': cmd_monitor,
|
|
'lband': cmd_lband,
|
|
'track': cmd_track,
|
|
}
|
|
|
|
handler = dispatch.get(args.command)
|
|
if handler is None:
|
|
parser.print_help()
|
|
sys.exit(1)
|
|
|
|
with SkyWalker1(verbose=args.verbose) as sw:
|
|
handler(sw, args)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|