skywalker-1/tools/qo100.py
Ryan Malloy cc3a0707a1 Add DiSEqC motor control, QO-100 DATV reception, and carrier survey
Firmware v3.03.0: DiSEqC Manchester encoder (cmd 0x8D extended),
parameterized spectrum sweep (0xBA), adaptive blind scan (0xBB),
error code reporting (0xBC). All new function locals moved to XDATA
to fit within FX2LP 256-byte internal RAM constraint.

Motor control: DiSEqC 1.2 positioner with USALS GotoX, stored
positions, interactive keyboard jog, 30-second safety auto-halt.

QO-100 DATV: Es'hail-2 wideband transponder tools — LNB IF
calculator, narrowband scan, tune, and TS-to-video pipe (ffplay/mpv).

Carrier survey: six-stage pipeline (coarse sweep → peak detection →
fine sweep → blind scan → TS sample → catalog). JSON catalog with
differential analysis, QO-100 optimized mode, CSV/text export.

TUI: F9 Motor screen (3-column layout with signal gauge), F10 Survey
screen (Full Band + QO-100 tabs). Bridge, demo, and theme updated.

Docs: motor.mdx, survey.mdx, qo100-datv.mdx guide, tui.mdx updated
for 10 screens. Site builds 41 pages, all links valid.
2026-02-15 17:01:11 -07:00

774 lines
28 KiB
Python
Executable File

#!/usr/bin/env python3
"""
QO-100 (Es'hail-2) DATV reception tool for the Genpix SkyWalker-1.
Provides frequency calculation, band plan display, wideband transponder
scanning, tuning, and live video piping for QO-100 amateur television
signals received via the SkyWalker-1 DVB-S demodulator.
QO-100 wideband transponder: 10491-10499 MHz (DVB-S QPSK, various SRs)
Narrowband transponder: 10489.5-10490 MHz (SSB/CW, not demodulable)
Engineering beacon: 10489.75 MHz (CW)
The BCM4500 demodulator has a minimum symbol rate of 256 ksps. QO-100
DATV signals typically range from 333 ksps to 2000 ksps, well within
the hardware capability. Signals below 256 ksps are detectable as
energy via spectrum sweep but cannot be locked/demodulated.
"""
import sys
import os
import argparse
import time
import signal as signal_mod
import subprocess
# Add tools directory to path for library import
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from skywalker_lib import (
SkyWalker1, MODULATIONS, FEC_RATES, MOD_FEC_GROUP,
rf_to_if, if_to_rf, detect_peaks, signal_bar,
)
# --- QO-100 constants ---
QO100_WB_START_MHZ = 10491.0 # wideband transponder start
QO100_WB_STOP_MHZ = 10499.0 # wideband transponder stop
QO100_BEACON_MHZ = 10489.75 # engineering beacon
QO100_ORBITAL_POS = 25.9 # degrees East (Es'hail-2)
QO100_NB_START_MHZ = 10489.5 # narrowband transponder start
QO100_NB_STOP_MHZ = 10490.0 # narrowband transponder stop
BCM4500_MIN_SR_KSPS = 256 # hardware minimum
# Common modified LNB local oscillators used for QO-100 reception
COMMON_QO100_LOS = {
9361: "Modified PLL LNB (TCXO, popular)",
9000: "Round LO",
9100: "Round LO",
9200: "Round LO",
9300: "Round LO",
9750: "Standard universal (low band)",
}
# Known DATV stations / frequencies on QO-100 wideband transponder
QO100_KNOWN_STATIONS = [
{"call": "BATC", "freq_mhz": 10491.5, "sr_ksps": 1500, "mod": "qpsk", "fec": "3/4", "note": "British ATV Club beacon"},
{"call": "Various", "freq_mhz": 10492.0, "sr_ksps": 1000, "mod": "qpsk", "fec": "1/2", "note": "Common DATV frequency"},
{"call": "Various", "freq_mhz": 10493.0, "sr_ksps": 500, "mod": "qpsk", "fec": "1/2", "note": "Low-power DATV"},
{"call": "Various", "freq_mhz": 10494.0, "sr_ksps": 333, "mod": "qpsk", "fec": "1/2", "note": "Minimum viable DVB-S"},
{"call": "Beacon", "freq_mhz": 10489.75, "sr_ksps": 0, "mod": "cw", "fec": "-", "note": "Engineering beacon (CW)"},
]
# --- Helper functions ---
def validate_qo100_lo(lnb_lo_mhz: int) -> dict:
"""
Check whether a given LNB LO places the QO-100 wideband transponder
within the SkyWalker-1 IF range (950-2150 MHz).
Returns dict with:
valid - bool, True if entire WB transponder fits in IF range
if_range - (start_if, stop_if) tuple in MHz
warnings - list of warning strings
"""
start_if = rf_to_if(QO100_WB_START_MHZ, lnb_lo_mhz)
stop_if = rf_to_if(QO100_WB_STOP_MHZ, lnb_lo_mhz)
warnings = []
if start_if < 950:
warnings.append(f"WB start IF {start_if:.0f} MHz is below 950 MHz minimum")
if stop_if > 2150:
warnings.append(f"WB stop IF {stop_if:.0f} MHz is above 2150 MHz maximum")
if start_if < 0 or stop_if < 0:
warnings.append(f"Negative IF -- LNB LO {lnb_lo_mhz} MHz is above the RF frequency")
# Check if LO is a known value
if lnb_lo_mhz not in COMMON_QO100_LOS:
nearby = [lo for lo in COMMON_QO100_LOS if abs(lo - lnb_lo_mhz) <= 100]
if not nearby:
warnings.append(f"LO {lnb_lo_mhz} MHz is not a common QO-100 value")
valid = (950 <= start_if) and (stop_if <= 2150)
return {
"valid": valid,
"if_range": (start_if, stop_if),
"warnings": warnings,
}
def qo100_if_range(lnb_lo_mhz: float) -> tuple:
"""Return (start_if, stop_if) in MHz for the QO-100 WB transponder."""
return (
rf_to_if(QO100_WB_START_MHZ, lnb_lo_mhz),
rf_to_if(QO100_WB_STOP_MHZ, lnb_lo_mhz),
)
def qo100_band_plan(lnb_lo_mhz: float) -> list:
"""
Return the QO-100 known station list augmented with IF frequencies
and lockability status for the given LNB LO.
Each entry is a dict with keys:
call, freq_mhz (RF), if_mhz, sr_ksps, mod, fec, note, lockable
"""
plan = []
for station in QO100_KNOWN_STATIONS:
if_mhz = rf_to_if(station["freq_mhz"], lnb_lo_mhz)
lockable = (
station["sr_ksps"] >= BCM4500_MIN_SR_KSPS
and station["mod"] in MODULATIONS
and 950 <= if_mhz <= 2150
)
plan.append({
"call": station["call"],
"freq_mhz": station["freq_mhz"],
"if_mhz": if_mhz,
"sr_ksps": station["sr_ksps"],
"mod": station["mod"],
"fec": station["fec"],
"note": station["note"],
"lockable": lockable,
})
return plan
def _resolve_fec(mod_name: str, fec_name: str) -> int:
"""Look up the FEC index for a given modulation and FEC rate string."""
fec_group = MOD_FEC_GROUP.get(mod_name)
if fec_group is None:
print(f"Unknown modulation: {mod_name}")
sys.exit(1)
fec_table = FEC_RATES[fec_group]
if fec_name not in fec_table:
print(f"Invalid FEC '{fec_name}' for {mod_name}")
print(f"Valid: {', '.join(fec_table.keys())}")
sys.exit(1)
return fec_table[fec_name]
def _print_lo_info(lnb_lo: float, verbose: bool = False) -> None:
"""Print LNB LO validation summary."""
lo_desc = COMMON_QO100_LOS.get(int(lnb_lo), "custom")
print(f" LNB LO: {lnb_lo:.0f} MHz ({lo_desc})")
check = validate_qo100_lo(lnb_lo)
start_if, stop_if = check["if_range"]
print(f" WB IF range: {start_if:.1f} - {stop_if:.1f} MHz")
if not check["valid"]:
print(f" WARNING: QO-100 WB transponder does not fit in IF range!")
for w in check["warnings"]:
print(f" WARNING: {w}")
if verbose:
beacon_if = rf_to_if(QO100_BEACON_MHZ, lnb_lo)
nb_start_if = rf_to_if(QO100_NB_START_MHZ, lnb_lo)
nb_stop_if = rf_to_if(QO100_NB_STOP_MHZ, lnb_lo)
print(f" Beacon IF: {beacon_if:.2f} MHz (CW, not demodulable)")
print(f" NB IF range: {nb_start_if:.1f} - {nb_stop_if:.1f} MHz (SSB/CW)")
# --- Subcommand handlers ---
def cmd_calc(sw: SkyWalker1, args: argparse.Namespace) -> None:
"""Show IF frequencies for a given LNB LO."""
lnb_lo = args.lnb_lo
print(f"QO-100 IF Frequency Calculator")
print(f"{'=' * 60}")
print(f" Satellite: Es'hail-2 (QO-100) at {QO100_ORBITAL_POS} deg E")
_print_lo_info(lnb_lo, verbose=True)
print(f"\n {'RF (MHz)':>12} {'IF (MHz)':>10} {'Description'}")
print(f" {'' * 50}")
entries = [
(QO100_NB_START_MHZ, "Narrowband start"),
(QO100_BEACON_MHZ, "Engineering beacon (CW)"),
(QO100_NB_STOP_MHZ, "Narrowband stop"),
(QO100_WB_START_MHZ, "Wideband start"),
(QO100_WB_STOP_MHZ, "Wideband stop"),
]
for rf, desc in entries:
if_mhz = rf_to_if(rf, lnb_lo)
in_range = 950 <= if_mhz <= 2150
marker = "" if in_range else " [OUT OF RANGE]"
print(f" {rf:12.2f} {if_mhz:10.2f} {desc}{marker}")
# Common LO comparison table
print(f"\n Common LNB LO comparison:")
print(f" {'LO (MHz)':>10} {'WB Start IF':>12} {'WB Stop IF':>12} {'Description'}")
print(f" {'' * 60}")
for lo, desc in sorted(COMMON_QO100_LOS.items()):
s_if = rf_to_if(QO100_WB_START_MHZ, lo)
e_if = rf_to_if(QO100_WB_STOP_MHZ, lo)
fits = 950 <= s_if and e_if <= 2150
status = "" if fits else " [!]"
current = " <--" if lo == int(lnb_lo) else ""
print(f" {lo:10d} {s_if:12.1f} {e_if:12.1f} {desc}{status}{current}")
def cmd_band_plan(sw: SkyWalker1, args: argparse.Namespace) -> None:
"""Show known QO-100 stations with IF conversion for this LNB LO."""
lnb_lo = args.lnb_lo
print(f"QO-100 Wideband Transponder Band Plan")
print(f"{'=' * 60}")
_print_lo_info(lnb_lo, verbose=args.verbose)
print()
plan = qo100_band_plan(lnb_lo)
# Table header
hdr = (f" {'Call':8s} {'RF MHz':>9s} {'IF MHz':>9s} "
f"{'SR ksps':>8s} {'Mod':5s} {'FEC':4s} {'Lock':4s} Note")
print(hdr)
print(f" {'' * 76}")
for entry in plan:
sr_str = f"{entry['sr_ksps']:>5d}" if entry["sr_ksps"] > 0 else " n/a"
if entry["lockable"]:
lock_str = " yes"
elif entry["sr_ksps"] == 0:
lock_str = " --"
elif entry["sr_ksps"] < BCM4500_MIN_SR_KSPS:
lock_str = " no"
elif entry["mod"] not in MODULATIONS:
lock_str = " no"
else:
lock_str = " no"
in_range = 950 <= entry["if_mhz"] <= 2150
if_str = f"{entry['if_mhz']:9.2f}" if in_range else f"{entry['if_mhz']:7.2f} !"
print(f" {entry['call']:8s} {entry['freq_mhz']:9.2f} {if_str} "
f"{sr_str:>8s} {entry['mod']:5s} {entry['fec']:4s} {lock_str} "
f"{entry['note']}")
# Legend
print(f"\n Lock column: yes = lockable by BCM4500 (SR >= {BCM4500_MIN_SR_KSPS} ksps, "
f"supported mod, IF in range)")
print(f" no = detectable as energy but not demodulable")
print(f" -- = not a digital signal (CW/SSB)")
def cmd_scan(sw: SkyWalker1, args: argparse.Namespace) -> None:
"""Scan the QO-100 wideband transponder for active carriers."""
lnb_lo = args.lnb_lo
step_mhz = args.step
dwell_ms = args.dwell
# Validate LO
check = validate_qo100_lo(lnb_lo)
if not check["valid"]:
print(f"ERROR: QO-100 WB transponder does not fit in IF range with LO {lnb_lo} MHz")
for w in check["warnings"]:
print(f" {w}")
sys.exit(1)
start_if, stop_if = check["if_range"]
# QO-100 optimized sweep parameters
# Low symbol rates need longer dwell, finer steps, lower measurement SR
sr_ksps = 1000 # lower SR for better sensitivity to narrow signals
steps = int((stop_if - start_if) / step_mhz) + 1
est_time = steps * (dwell_ms + 5) / 1000.0
print(f"QO-100 Wideband Transponder Scan")
print(f"{'=' * 60}")
_print_lo_info(lnb_lo, verbose=args.verbose)
print(f" RF range: {QO100_WB_START_MHZ:.1f} - {QO100_WB_STOP_MHZ:.1f} MHz")
print(f" IF range: {start_if:.1f} - {stop_if:.1f} MHz")
print(f" Step: {step_mhz} MHz ({steps} points)")
print(f" Dwell: {dwell_ms} ms")
print(f" Meas SR: {sr_ksps} ksps")
print(f" Est. time: {est_time:.1f}s")
print()
sw.ensure_booted()
# Sweep the wideband transponder IF range
print("[1/3] Sweeping wideband transponder...")
def progress(freq, step_num, total, result):
pct = (step_num + 1) / total * 100
rf = if_to_rf(freq, lnb_lo)
sys.stdout.write(f"\r [{pct:5.1f}%] IF {freq:.1f} MHz RF {rf:.1f} MHz"
f" pwr={result['power_db']:.1f} dB"
f" AGC={result['agc1']}")
sys.stdout.flush()
freqs, powers, results = sw.sweep_spectrum(
start_if, stop_if, step_mhz, dwell_ms, sr_ksps,
callback=progress if not args.verbose else None
)
sys.stdout.write("\r" + " " * 70 + "\r")
print(f" {len(freqs)} points measured")
# Peak detection
print(f"\n[2/3] Peak detection (threshold {args.threshold:.0f} dB)...")
peaks = detect_peaks(freqs, powers, threshold_db=args.threshold)
if not peaks:
print(" No carriers detected above noise floor.")
print(" Check dish alignment, LNB LO, and that the transponder is active.")
return
print(f" {len(peaks)} carrier(s) detected:")
print()
print(f" {'IF MHz':>8s} {'RF MHz':>10s} {'Power dB':>9s} Nearest known station")
print(f" {'' * 55}")
for freq_if, pwr, idx in peaks:
freq_rf = if_to_rf(freq_if, lnb_lo)
# Match to nearest known station
nearest = None
nearest_dist = 999
for station in QO100_KNOWN_STATIONS:
dist = abs(station["freq_mhz"] - freq_rf)
if dist < nearest_dist:
nearest_dist = dist
nearest = station
match_str = ""
if nearest and nearest_dist < 1.0:
lockable = nearest["sr_ksps"] >= BCM4500_MIN_SR_KSPS
lock_note = "" if lockable else " [below min SR]"
match_str = (f"{nearest['call']} ({nearest['sr_ksps']} ksps "
f"{nearest['mod']} {nearest['fec']}){lock_note}")
elif nearest and nearest_dist < 2.0:
match_str = f"near {nearest['call']} ({nearest_dist:.1f} MHz off)"
print(f" {freq_if:8.1f} {freq_rf:10.2f} {pwr:9.1f} {match_str}")
# Try locking each peak that could be a DATV signal
print(f"\n[3/3] Attempting lock on detected carriers...")
locked_count = 0
for freq_if, pwr, idx in peaks:
freq_rf = if_to_rf(freq_if, lnb_lo)
if_khz = int(freq_if * 1000)
# Try common QO-100 symbol rates, highest first
trial_srs = [1500, 1000, 500, 333, 256]
for sr in trial_srs:
if sr < BCM4500_MIN_SR_KSPS:
continue
sr_sps = sr * 1000
mod_index, _ = MODULATIONS["qpsk"]
fec_group = MOD_FEC_GROUP["qpsk"]
fec_index = FEC_RATES[fec_group]["auto"]
if args.verbose:
print(f" Trying {freq_rf:.2f} MHz SR {sr} ksps...", end="", flush=True)
sw.tune(sr_sps, if_khz, mod_index, fec_index)
time.sleep(0.3)
if sw.get_signal_lock():
sig = sw.get_signal_strength()
print(f" LOCKED {freq_rf:.2f} MHz SR {sr} ksps "
f"SNR {sig['snr_db']:.1f} dB {signal_bar(sig['snr_pct'], width=20)}")
locked_count += 1
break
elif args.verbose:
print(f" no lock")
print(f"\n Scan complete: {len(peaks)} carriers detected, {locked_count} locked")
def cmd_tune(sw: SkyWalker1, args: argparse.Namespace) -> None:
"""Tune to a specific QO-100 frequency."""
lnb_lo = args.lnb_lo
freq_rf = args.freq
sr_ksps = args.sr
mod_name = args.mod
fec_name = args.fec
# Validate
if sr_ksps < BCM4500_MIN_SR_KSPS:
print(f"ERROR: Symbol rate {sr_ksps} ksps is below BCM4500 minimum ({BCM4500_MIN_SR_KSPS} ksps)")
sys.exit(1)
if mod_name not in MODULATIONS:
print(f"Unknown modulation: {mod_name}")
print(f"Valid: {', '.join(MODULATIONS.keys())}")
sys.exit(1)
mod_index, mod_desc = MODULATIONS[mod_name]
fec_index = _resolve_fec(mod_name, fec_name)
if_mhz = rf_to_if(freq_rf, lnb_lo)
if_khz = int(if_mhz * 1000)
sr_sps = sr_ksps * 1000
if if_khz < 950000 or if_khz > 2150000:
print(f"ERROR: IF frequency {if_mhz:.1f} MHz is outside 950-2150 MHz range")
print(f" RF: {freq_rf} MHz, LNB LO: {lnb_lo} MHz")
sys.exit(1)
print(f"QO-100 Tune")
print(f"{'=' * 60}")
_print_lo_info(lnb_lo, verbose=args.verbose)
print(f" RF Frequency: {freq_rf} MHz")
print(f" IF Frequency: {if_mhz:.2f} MHz ({if_khz} kHz)")
print(f" Symbol Rate: {sr_ksps} ksps ({sr_sps} sps)")
print(f" Modulation: {mod_desc}")
print(f" FEC: {fec_name} (index {fec_index})")
print()
sw.ensure_booted()
# Tune
print("Sending tune command...", end="", flush=True)
sw.tune(sr_sps, if_khz, mod_index, fec_index)
print(" done")
# Wait for lock
timeout = args.timeout
print(f"Waiting for lock (timeout {timeout}s)...", end="", flush=True)
deadline = time.time() + timeout
locked = False
dots = 0
while time.time() < deadline:
if sw.get_signal_lock():
locked = True
break
print(".", end="", flush=True)
dots += 1
time.sleep(0.5)
print()
if locked:
sig = sw.get_signal_strength()
print(f"\n LOCKED")
print(f" SNR: {sig['snr_db']:.1f} dB (raw 0x{sig['snr_raw']:04X})")
print(f" Quality: {signal_bar(sig['snr_pct'])}")
else:
print(f"\n NO LOCK after {timeout}s")
print(f" Possible causes:")
print(f" - No signal at {freq_rf} MHz (station may be off-air)")
print(f" - Wrong symbol rate (try scanning first)")
print(f" - Dish not aligned to {QO100_ORBITAL_POS} deg E")
print(f" - LNB LO mismatch (expected {lnb_lo} MHz)")
def cmd_watch(sw: SkyWalker1, args: argparse.Namespace) -> None:
"""Tune to a QO-100 frequency and pipe the transport stream to a video player."""
lnb_lo = args.lnb_lo
freq_rf = args.freq
sr_ksps = args.sr
mod_name = args.mod
fec_name = args.fec
player_cmd = args.player
# Validate
if sr_ksps < BCM4500_MIN_SR_KSPS:
print(f"ERROR: Symbol rate {sr_ksps} ksps is below BCM4500 minimum ({BCM4500_MIN_SR_KSPS} ksps)")
sys.exit(1)
if mod_name not in MODULATIONS:
print(f"Unknown modulation: {mod_name}")
print(f"Valid: {', '.join(MODULATIONS.keys())}")
sys.exit(1)
mod_index, mod_desc = MODULATIONS[mod_name]
fec_index = _resolve_fec(mod_name, fec_name)
if_mhz = rf_to_if(freq_rf, lnb_lo)
if_khz = int(if_mhz * 1000)
sr_sps = sr_ksps * 1000
if if_khz < 950000 or if_khz > 2150000:
print(f"ERROR: IF frequency {if_mhz:.1f} MHz is outside 950-2150 MHz range")
print(f" RF: {freq_rf} MHz, LNB LO: {lnb_lo} MHz")
sys.exit(1)
# Status messages go to stderr so stdout is clean for piping
status = sys.stderr
status.write(f"QO-100 Watch\n")
status.write(f"{'=' * 60}\n")
status.write(f" RF Frequency: {freq_rf} MHz\n")
status.write(f" IF Frequency: {if_mhz:.2f} MHz\n")
status.write(f" Symbol Rate: {sr_ksps} ksps\n")
status.write(f" Modulation: {mod_desc}\n")
status.write(f" FEC: {fec_name}\n")
if player_cmd:
status.write(f" Player: {player_cmd}\n")
else:
status.write(f" Output: stdout (pipe to player)\n")
status.write(f"\n")
status.flush()
sw.ensure_booted()
# Tune and wait for lock
status.write("Tuning...\n")
status.flush()
sw.tune(sr_sps, if_khz, mod_index, fec_index)
timeout = args.timeout
deadline = time.time() + timeout
locked = False
while time.time() < deadline:
if sw.get_signal_lock():
locked = True
break
time.sleep(0.3)
if not locked:
status.write(f"NO LOCK after {timeout}s -- aborting\n")
status.flush()
sys.exit(1)
sig = sw.get_signal_strength()
status.write(f"LOCKED SNR {sig['snr_db']:.1f} dB {signal_bar(sig['snr_pct'], width=20)}\n")
status.flush()
# Open player subprocess or use stdout
player_proc = None
output_fd = None
if player_cmd:
try:
player_proc = subprocess.Popen(
player_cmd, shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
output_fd = player_proc.stdin
status.write(f"Player started (PID {player_proc.pid})\n")
status.flush()
except OSError as e:
status.write(f"Failed to start player: {e}\n")
status.flush()
sys.exit(1)
else:
output_fd = sys.stdout.buffer
# Stream
sw.arm_transfer(on=True)
status.write("Streaming...\n")
status.flush()
total_bytes = 0
start_time = time.time()
last_status = start_time
running = True
def stop_handler(signum, frame):
nonlocal running
running = False
signal_mod.signal(signal_mod.SIGINT, stop_handler)
signal_mod.signal(signal_mod.SIGTERM, stop_handler)
try:
while running:
# Check if player is still alive
if player_proc and player_proc.poll() is not None:
status.write(f"\nPlayer exited (code {player_proc.returncode})\n")
status.flush()
break
chunk = sw.read_stream(timeout=2000)
if chunk:
try:
output_fd.write(chunk)
output_fd.flush()
total_bytes += len(chunk)
except BrokenPipeError:
status.write("\nPipe closed\n")
status.flush()
break
now = time.time()
if now - last_status >= 2.0:
elapsed = now - start_time
bitrate = (total_bytes * 8) / elapsed if elapsed > 0 else 0
if bitrate >= 1e6:
rate_str = f"{bitrate / 1e6:.2f} Mbps"
else:
rate_str = f"{bitrate / 1e3:.1f} kbps"
# Quick signal check
still_locked = sw.get_signal_lock()
lock_str = "LOCK" if still_locked else "----"
status.write(f"\r [{lock_str}] {total_bytes:,} bytes "
f"{rate_str} ({elapsed:.0f}s) ")
status.flush()
last_status = now
finally:
sw.arm_transfer(on=False)
if player_proc:
player_proc.terminate()
player_proc.wait(timeout=5)
status.write(f"\n Stopped. Total: {total_bytes:,} bytes\n")
status.flush()
# --- CLI ---
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="qo100.py",
description="QO-100 (Es'hail-2) DATV reception tool for the SkyWalker-1",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""\
examples:
%(prog)s calc --lnb-lo 9750
%(prog)s calc --lnb-lo 9361
%(prog)s band-plan --lnb-lo 9750
%(prog)s scan --lnb-lo 9750
%(prog)s scan --lnb-lo 9361 --step 0.25 --dwell 100
%(prog)s tune --lnb-lo 9750 --freq 10491.5 --sr 1500
%(prog)s tune --lnb-lo 9750 --freq 10491.5 --sr 1500 --fec 3/4
%(prog)s watch --lnb-lo 9750 --freq 10491.5 --sr 1500 --player "ffplay -f mpegts -i pipe:0"
%(prog)s watch --lnb-lo 9750 --freq 10491.5 --sr 1500 --player "mpv -"
%(prog)s watch --lnb-lo 9750 --freq 10491.5 --sr 1500 | vlc -
QO-100 wideband transponder: 10491-10499 MHz (DVB-S QPSK, various SRs)
BCM4500 minimum symbol rate: 256 ksps
Common LNB LOs: 9750 (universal), 9361 (TCXO PLL, popular for QO-100)
The --lnb-lo parameter is required for all commands. It must match your
LNB's actual local oscillator frequency for correct IF calculation.
""")
parser.add_argument('-v', '--verbose', action='store_true',
help="Verbose output (USB traffic, extra detail)")
sub = parser.add_subparsers(dest='command')
# calc
p_calc = sub.add_parser('calc',
help="Show IF frequencies for a given LNB LO",
formatter_class=argparse.RawDescriptionHelpFormatter)
p_calc.add_argument('--lnb-lo', type=float, required=True,
help="LNB local oscillator frequency in MHz")
# band-plan
p_bp = sub.add_parser('band-plan',
help="Show known QO-100 stations with IF conversion",
formatter_class=argparse.RawDescriptionHelpFormatter)
p_bp.add_argument('--lnb-lo', type=float, required=True,
help="LNB local oscillator frequency in MHz")
# scan
p_scan = sub.add_parser('scan',
help="Scan wideband transponder for active carriers",
formatter_class=argparse.RawDescriptionHelpFormatter)
p_scan.add_argument('--lnb-lo', type=float, required=True,
help="LNB local oscillator frequency in MHz")
p_scan.add_argument('--step', type=float, default=0.5,
help="Frequency step in MHz (default: 0.5, finer for low-SR)")
p_scan.add_argument('--dwell', type=int, default=75,
help="Dwell time per step in ms (default: 75, longer for sensitivity)")
p_scan.add_argument('--threshold', type=float, default=3.0,
help="Peak detection threshold in dB (default: 3.0)")
# tune
p_tune = sub.add_parser('tune',
help="Tune to a specific QO-100 frequency",
formatter_class=argparse.RawDescriptionHelpFormatter)
p_tune.add_argument('--lnb-lo', type=float, required=True,
help="LNB local oscillator frequency in MHz")
p_tune.add_argument('--freq', type=float, required=True,
help="RF frequency in MHz (e.g. 10491.5)")
p_tune.add_argument('--sr', type=int, required=True,
help="Symbol rate in ksps (e.g. 1500)")
p_tune.add_argument('--mod', default='qpsk',
choices=list(MODULATIONS.keys()),
help="Modulation type (default: qpsk)")
p_tune.add_argument('--fec', default='auto',
help="FEC rate (default: auto)")
p_tune.add_argument('--timeout', type=float, default=10,
help="Lock timeout in seconds (default: 10)")
# watch
p_watch = sub.add_parser('watch',
help="Tune and pipe transport stream to video player",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""\
Watch pipes the raw MPEG-2 transport stream to a video player or stdout.
Status output goes to stderr so the TS data on stdout stays clean.
Player examples:
--player "ffplay -f mpegts -i pipe:0"
--player "mpv --demuxer=lavf -"
--player "vlc --demux ts -"
Without --player, the TS stream is written to stdout for shell piping:
qo100.py watch --lnb-lo 9750 --freq 10491.5 --sr 1500 | vlc -
""")
p_watch.add_argument('--lnb-lo', type=float, required=True,
help="LNB local oscillator frequency in MHz")
p_watch.add_argument('--freq', type=float, required=True,
help="RF frequency in MHz (e.g. 10491.5)")
p_watch.add_argument('--sr', type=int, required=True,
help="Symbol rate in ksps (e.g. 1500)")
p_watch.add_argument('--mod', default='qpsk',
choices=list(MODULATIONS.keys()),
help="Modulation type (default: qpsk)")
p_watch.add_argument('--fec', default='auto',
help="FEC rate (default: auto)")
p_watch.add_argument('--player', default=None,
help="Player command (e.g. 'ffplay -f mpegts -i pipe:0')")
p_watch.add_argument('--timeout', type=float, default=15,
help="Lock timeout in seconds (default: 15)")
return parser
def main():
parser = build_parser()
args = parser.parse_args()
if not args.command:
parser.print_help()
sys.exit(0)
# calc and band-plan don't need the device
if args.command in ('calc', 'band-plan'):
dispatch = {
'calc': cmd_calc,
'band-plan': cmd_band_plan,
}
handler = dispatch[args.command]
handler(None, args)
return
dispatch = {
'scan': cmd_scan,
'tune': cmd_tune,
'watch': cmd_watch,
}
handler = dispatch.get(args.command)
if handler is None:
parser.print_help()
sys.exit(1)
with SkyWalker1(verbose=args.verbose) as sw:
handler(sw, args)
if __name__ == '__main__':
main()