Add Textual TUI for SkyWalker-1 RF tool

Separate entry point (skywalker-tui) that reuses skywalker_lib.py
unchanged. Five RF modes: spectrum, scan, monitor, lband, track —
each with threaded USB bridge workers for non-blocking I/O.

Includes --demo mode with synthetic signal generation (Gaussian
peaks, noise floor, AGC simulation) for development without hardware.

Custom widgets: spectrum bar chart, rolling waterfall, signal gauge,
sparkline history, transponder table, device status bar.
This commit is contained in:
Ryan Malloy 2026-02-13 04:39:55 -07:00
parent c4bfe33d61
commit 64c33985a3
21 changed files with 2762 additions and 0 deletions

4
.gitignore vendored
View File

@ -8,6 +8,10 @@ firmware/fx2lib/
firmware/build/
tools/__pycache__/
# TUI
tui/.venv/
tui/__pycache__/
# Documentation site
site/node_modules/
site/dist/

24
tui/pyproject.toml Normal file
View File

@ -0,0 +1,24 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "skywalker-tui"
version = "0.1.0"
description = "Textual TUI for Genpix SkyWalker-1 DVB-S receiver"
requires-python = ">=3.11"
authors = [{name = "Ryan Malloy", email = "ryan@supported.systems"}]
dependencies = [
"textual>=3.0",
"pyusb>=1.3",
]
[project.scripts]
skywalker-tui = "skywalker_tui.app:main"
[tool.hatch.build.targets.wheel]
packages = ["src/skywalker_tui"]
[tool.ruff]
target-version = "py311"
line-length = 100

View File

@ -0,0 +1,3 @@
"""Textual TUI for Genpix SkyWalker-1 DVB-S receiver."""
__version__ = "0.1.0"

View File

@ -0,0 +1,165 @@
"""SkyWalker-1 TUI — main application.
Provides mode switching between 5 RF operating modes via a sidebar and F-key
shortcuts. Each mode is a Screen subclass that manages its own workers.
Note: We use "rf_mode" terminology for our 5 operating modes to avoid colliding
with Textual's built-in App.mode / _current_mode / _screen_stacks system.
"""
import argparse
import sys
import os
from pathlib import Path
# Add tools directory to path for skywalker_lib import
_tools_dir = str(Path(__file__).resolve().parent.parent.parent.parent / "tools")
if _tools_dir not in sys.path:
sys.path.insert(0, _tools_dir)
from textual.app import App, ComposeResult
from textual.binding import Binding
from textual.containers import Horizontal, Vertical
from textual.widgets import Header, Footer, Button, Label, Static, ContentSwitcher
from skywalker_tui.bridge import USBBridge
from skywalker_tui.demo import DemoDevice
from skywalker_tui.widgets.status_bar import DeviceStatusBar
from skywalker_tui.screens.spectrum import SpectrumScreen
from skywalker_tui.screens.scan import ScanScreen
from skywalker_tui.screens.monitor import MonitorScreen
from skywalker_tui.screens.lband import LBandScreen
from skywalker_tui.screens.track import TrackScreen
MODES = {
"spectrum": ("F1 Spectrum", SpectrumScreen),
"scan": ("F2 Scan", ScanScreen),
"monitor": ("F3 Monitor", MonitorScreen),
"lband": ("F4 L-Band", LBandScreen),
"track": ("F5 Track", TrackScreen),
}
class SkyWalkerApp(App):
"""Textual TUI for Genpix SkyWalker-1 DVB-S receiver."""
TITLE = "SkyWalker-1"
SUB_TITLE = "DVB-S RF Tool"
CSS_PATH = "theme.tcss"
BINDINGS = [
Binding("f1", "rf_mode('spectrum')", "Spectrum", show=True),
Binding("f2", "rf_mode('scan')", "Scan", show=True),
Binding("f3", "rf_mode('monitor')", "Monitor", show=True),
Binding("f4", "rf_mode('lband')", "L-Band", show=True),
Binding("f5", "rf_mode('track')", "Track", show=True),
Binding("q", "quit", "Quit", show=True),
Binding("d", "toggle_dark", "Theme", show=True),
]
def __init__(self, bridge: USBBridge, initial_mode: str = "spectrum"):
super().__init__()
self._bridge = bridge
self._initial_rf_mode = initial_mode
self._active_rf_mode = initial_mode
self._rf_screens: dict[str, object] = {}
def compose(self) -> ComposeResult:
yield Header()
with Horizontal():
with Vertical(id="sidebar"):
yield Label("[bold #00d4aa]SkyWalker-1[/]", classes="sidebar-heading")
yield Label("[#506878]DVB-S RF Tool[/]", classes="sidebar-heading")
yield Static("")
for mode_key, (label, _cls) in MODES.items():
yield Button(label, id=f"btn-{mode_key}", classes="mode-button")
yield Static("")
yield DeviceStatusBar(self._bridge)
yield ContentSwitcher(id="content-area")
yield Footer()
def on_mount(self) -> None:
# Initialize status bar
status = self.query_one(DeviceStatusBar)
status.update_status(self._bridge)
# Install all mode screens into the content switcher
switcher = self.query_one("#content-area", ContentSwitcher)
for mode_key, (_label, cls) in MODES.items():
screen = cls(self._bridge, id=f"screen-{mode_key}")
self._rf_screens[mode_key] = screen
switcher.mount(screen)
# Activate initial mode
self.action_rf_mode(self._initial_rf_mode)
def action_rf_mode(self, mode: str) -> None:
"""Switch to a different RF operating mode."""
if mode not in MODES:
return
self._active_rf_mode = mode
switcher = self.query_one("#content-area", ContentSwitcher)
switcher.current = f"screen-{mode}"
# Update sidebar button highlights
for mode_key in MODES:
btn = self.query_one(f"#btn-{mode_key}", Button)
btn.remove_class("-active")
self.query_one(f"#btn-{mode}", Button).add_class("-active")
self.sub_title = f"DVB-S RF Tool — {MODES[mode][0]}"
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle sidebar mode button clicks."""
btn_id = event.button.id or ""
if btn_id.startswith("btn-"):
mode = btn_id[4:]
if mode in MODES:
self.action_rf_mode(mode)
def action_toggle_dark(self) -> None:
self.dark = not self.dark
def main():
parser = argparse.ArgumentParser(
prog="skywalker-tui",
description="Textual TUI for Genpix SkyWalker-1 DVB-S receiver",
)
parser.add_argument(
"--demo", action="store_true",
help="Use synthetic signal data (no hardware required)",
)
parser.add_argument(
"mode", nargs="?", default="spectrum",
choices=list(MODES.keys()),
help="Initial mode (default: spectrum)",
)
parser.add_argument(
"-v", "--verbose", action="store_true",
help="Verbose USB logging (hardware mode only)",
)
args = parser.parse_args()
if args.demo:
device = DemoDevice()
bridge = USBBridge(device)
else:
try:
from skywalker_lib import SkyWalker1
device = SkyWalker1(verbose=args.verbose)
device.open()
bridge = USBBridge(device)
except Exception as e:
print(f"Cannot open SkyWalker-1: {e}", file=sys.stderr)
print("Use --demo for synthetic signal data.", file=sys.stderr)
sys.exit(1)
app = SkyWalkerApp(bridge=bridge, initial_mode=args.mode)
try:
app.run()
finally:
bridge.close()

View File

@ -0,0 +1,94 @@
"""Thread-safe bridge between Textual async event loop and blocking pyusb calls.
All SkyWalker1 (or DemoDevice) methods are blocking I/O they perform USB control
transfers that can take 2-200ms each. Textual's event loop is asyncio-based, so
calling these directly would freeze the UI.
The USBBridge wraps every device method behind a threading.Lock to prevent concurrent
USB access (the BCM4500 can't handle overlapping control transfers) and exposes them
as plain synchronous methods meant to be called from Textual @work(thread=True) workers.
"""
import threading
class USBBridge:
"""Thread-safe wrapper around SkyWalker1 or DemoDevice."""
def __init__(self, device):
self._dev = device
self._lock = threading.Lock()
@property
def is_demo(self) -> bool:
return hasattr(self._dev, "_demo")
def open(self):
with self._lock:
if hasattr(self._dev, "open"):
self._dev.open()
def close(self):
with self._lock:
if hasattr(self._dev, "close"):
self._dev.close()
def get_fw_version(self) -> dict:
with self._lock:
return self._dev.get_fw_version()
def get_config(self) -> int:
with self._lock:
return self._dev.get_config()
def ensure_booted(self):
with self._lock:
self._dev.ensure_booted()
def signal_monitor(self) -> dict:
with self._lock:
return self._dev.signal_monitor()
def sweep_spectrum(self, start_mhz: float, stop_mhz: float,
step_mhz: float = 5.0, dwell_ms: int = 10,
sr_ksps: int = 20000, mod_index: int = 0,
fec_index: int = 5, callback=None) -> tuple:
with self._lock:
return self._dev.sweep_spectrum(
start_mhz, stop_mhz, step_mhz, dwell_ms,
sr_ksps, mod_index, fec_index, callback,
)
def tune_monitor(self, symbol_rate_sps: int, freq_khz: int,
mod_index: int, fec_index: int,
dwell_ms: int = 10) -> dict:
with self._lock:
return self._dev.tune_monitor(
symbol_rate_sps, freq_khz, mod_index, fec_index, dwell_ms,
)
def tune(self, symbol_rate_sps: int, freq_khz: int,
mod_index: int, fec_index: int):
with self._lock:
self._dev.tune(symbol_rate_sps, freq_khz, mod_index, fec_index)
def set_lnb_voltage(self, high: bool):
with self._lock:
self._dev.set_lnb_voltage(high)
def set_22khz_tone(self, on: bool):
with self._lock:
self._dev.set_22khz_tone(on)
def configure_lnb(self, pol=None, band=None, lnb_lo=None,
disable_lnb=False) -> float:
with self._lock:
return self._dev.configure_lnb(pol, band, lnb_lo, disable_lnb)
def blind_scan(self, freq_khz: int, sr_min: int, sr_max: int,
sr_step: int) -> dict | None:
"""Run blind scan at a single frequency. Returns result dict or None."""
with self._lock:
if hasattr(self._dev, "blind_scan"):
return self._dev.blind_scan(freq_khz, sr_min, sr_max, sr_step)
return None

View File

@ -0,0 +1,243 @@
"""Synthetic signal generator for --demo mode.
DemoDevice mimics the SkyWalker1 API without any USB hardware. It generates
realistic-looking signal data with:
- Gaussian peaks at known satellite transponder positions
- Thermal noise floor with Gaussian jitter
- AGC values inversely correlated to signal strength
- Occasional lock/unlock transitions based on SNR threshold
- Slow drift in SNR to simulate atmospheric effects
This enables full TUI development and testing without hardware.
"""
import math
import random
import time
# Simulated transponder positions (IF MHz) and relative strengths
_TRANSPONDERS = [
(1050, -12.0, 27500), # strong Ku-band TP
(1220, -18.0, 22000), # moderate TP
(1480, -25.0, 30000), # weak TP
(1750, -15.0, 20000), # moderate TP
(1950, -22.0, 13000), # weaker TP
]
_NOISE_FLOOR = -35.0
_LOCK_THRESHOLD_DB = 3.5
class DemoDevice:
"""Drop-in replacement for SkyWalker1 that generates synthetic data."""
_demo = True # marker for bridge.is_demo
def __init__(self):
self._booted = False
self._lnb_on = False
self._lnb_voltage_high = False
self._tone_22khz = False
self._tuned_freq_khz = 0
self._tuned_sr_sps = 0
self._start_time = time.monotonic()
self._sample_count = 0
def open(self):
pass
def close(self):
pass
def __enter__(self):
return self
def __exit__(self, *exc):
pass
def get_fw_version(self) -> dict:
return {
"major": 3,
"minor": 2,
"patch": 0,
"version": "3.02.0",
"date": "2025-02-10",
}
def get_config(self) -> int:
bits = 0
if self._booted:
bits |= 0x01 # 8PSK started
bits |= 0x02 # FW loaded
if self._lnb_on:
bits |= 0x04 # LNB power
if self._lnb_voltage_high:
bits |= 0x20 # 18V
if self._tone_22khz:
bits |= 0x10 # 22 kHz
if self._tuned_freq_khz > 0:
bits |= 0x40 # tuned
return bits
def ensure_booted(self):
self._booted = True
self._lnb_on = True
def set_lnb_voltage(self, high: bool):
self._lnb_voltage_high = high
def set_22khz_tone(self, on: bool):
self._tone_22khz = on
def configure_lnb(self, pol=None, band=None, lnb_lo=None,
disable_lnb=False) -> float:
if disable_lnb:
self._lnb_on = False
return 0.0
if pol:
self._lnb_voltage_high = pol.upper() in ("H", "L")
if band:
self._tone_22khz = band == "high"
if lnb_lo is not None:
return lnb_lo
elif band == "high":
return 10600.0
else:
return 9750.0
def start_intersil(self, on: bool = True):
self._lnb_on = on
def tune(self, symbol_rate_sps: int, freq_khz: int,
mod_index: int, fec_index: int):
self._tuned_freq_khz = freq_khz
self._tuned_sr_sps = symbol_rate_sps
def signal_monitor(self) -> dict:
"""Generate synthetic signal data at the currently tuned frequency."""
self._sample_count += 1
elapsed = time.monotonic() - self._start_time
freq_mhz = self._tuned_freq_khz / 1000.0 if self._tuned_freq_khz else 1200.0
power_db = self._power_at(freq_mhz, elapsed)
snr_db = max(0.0, power_db - _NOISE_FLOOR + random.gauss(0, 0.3))
locked = snr_db > _LOCK_THRESHOLD_DB
snr_raw = int(snr_db * 256)
agc1, agc2 = self._power_to_agc(power_db)
return {
"snr_raw": snr_raw,
"snr_db": snr_db,
"snr_pct": min(100.0, snr_raw * 17 / 65535 * 100),
"agc1": agc1,
"agc2": agc2,
"power_db": power_db,
"lock": 0x20 if locked else 0x00,
"locked": locked,
"status": 0x01 if self._booted else 0x00,
}
def tune_monitor(self, symbol_rate_sps: int, freq_khz: int,
mod_index: int, fec_index: int,
dwell_ms: int = 10) -> dict:
"""Simulate a tune + measure at a single frequency."""
# Simulate dwell time (scaled down for responsiveness)
time.sleep(min(dwell_ms, 5) / 1000.0)
freq_mhz = freq_khz / 1000.0
elapsed = time.monotonic() - self._start_time
power_db = self._power_at(freq_mhz, elapsed)
snr_db = max(0.0, power_db - _NOISE_FLOOR + random.gauss(0, 0.2))
locked = snr_db > _LOCK_THRESHOLD_DB
snr_raw = int(snr_db * 256)
agc1, agc2 = self._power_to_agc(power_db)
return {
"snr_raw": snr_raw,
"snr_db": snr_db,
"agc1": agc1,
"agc2": agc2,
"power_db": power_db,
"lock": 0x20 if locked else 0x00,
"locked": locked,
"status": 0x01,
"dwell_ms": dwell_ms,
}
def sweep_spectrum(self, start_mhz: float, stop_mhz: float,
step_mhz: float = 5.0, dwell_ms: int = 10,
sr_ksps: int = 20000, mod_index: int = 0,
fec_index: int = 5, callback=None) -> tuple:
"""Sweep with synthetic data. Same return signature as SkyWalker1."""
sr_sps = sr_ksps * 1000
freqs = []
powers = []
results = []
freq = start_mhz
steps = max(1, int((stop_mhz - start_mhz) / step_mhz) + 1)
step_num = 0
while freq <= stop_mhz:
freq_khz = int(freq * 1000)
result = self.tune_monitor(sr_sps, freq_khz, mod_index, fec_index, dwell_ms)
freqs.append(freq)
powers.append(result["power_db"])
results.append(result)
if callback:
callback(freq, step_num, steps, result)
step_num += 1
freq += step_mhz
return freqs, powers, results
def blind_scan(self, freq_khz: int, sr_min: int, sr_max: int,
sr_step: int) -> dict | None:
"""Simulate blind scan — lock onto nearby transponders."""
freq_mhz = freq_khz / 1000.0
for tp_freq, tp_power, tp_sr in _TRANSPONDERS:
if abs(freq_mhz - tp_freq) < 15:
sr_sps = tp_sr * 1000
if sr_min <= sr_sps <= sr_max:
return {
"freq_khz": int(tp_freq * 1000),
"sr_sps": sr_sps,
"locked": True,
}
return None
# --- Internal signal model ---
def _power_at(self, freq_mhz: float, elapsed: float) -> float:
"""Calculate synthetic power at a given frequency and time."""
# Start with noise floor + jitter
power = _NOISE_FLOOR + random.gauss(0, 0.5)
# Add Gaussian peaks for each simulated transponder
for tp_freq, tp_peak, _sr in _TRANSPONDERS:
# Bandwidth ~15 MHz sigma
dist = (freq_mhz - tp_freq)
gauss = math.exp(-(dist ** 2) / (2 * 12.0 ** 2))
# Slow atmospheric drift: +-2 dB over 30s period
drift = 2.0 * math.sin(elapsed / 30.0 * 2 * math.pi + tp_freq / 100.0)
power += (tp_peak - _NOISE_FLOOR + drift) * gauss
return power
@staticmethod
def _power_to_agc(power_db: float) -> tuple[int, int]:
"""Convert power_db back to simulated AGC register values."""
# Invert the agc_to_power_db formula: power = -40 * (combined / 65535)
# combined = power / -40 * 65535
if power_db >= 0:
combined = 0
else:
combined = int(min(65535, abs(power_db) / 40.0 * 65535))
agc1 = min(65535, combined)
agc2 = random.randint(0, 255) << 4 # fine adjustment noise
return agc1, agc2

View File

@ -0,0 +1 @@
"""Mode screens for SkyWalker-1 TUI."""

View File

@ -0,0 +1,221 @@
"""L-Band screen — direct input spectrum analyzer with allocation annotations.
Same sweep mechanics as the spectrum screen, but with LNB disabled (direct input)
and band allocation overlays showing what service each frequency range belongs to.
"""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..", "tools"))
from textual.app import ComposeResult
from textual.containers import Horizontal, Vertical
from textual.screen import Screen
from textual.widgets import Label, Input, Button, Static, ProgressBar, Checkbox
from textual import work
from textual.worker import Worker
from skywalker_lib import LBAND_ALLOCATIONS
from skywalker_tui.widgets.spectrum_plot import SpectrumPlot
from skywalker_tui.widgets.waterfall import WaterfallDisplay
def _alloc_table(start: float, stop: float) -> str:
"""Build a Rich-markup allocation reference for the visible range."""
lines = ["[#00d4aa bold]L-Band Allocations in range:[/]"]
colors = ["#60a0c0", "#80b060", "#c0a050", "#a06080", "#50a0a0", "#a08060", "#6080a0"]
for i, (lo, hi, name) in enumerate(LBAND_ALLOCATIONS):
if lo < stop and hi > start:
overlap_lo = max(lo, start)
overlap_hi = min(hi, stop)
c = colors[i % len(colors)]
lines.append(f" [{c}]{overlap_lo:.0f}-{overlap_hi:.0f} MHz {name}[/]")
if len(lines) == 1:
lines.append(" [#506878](none in range)[/]")
return "\n".join(lines)
class LBandScreen(Screen):
"""L-band direct input analyzer with allocation annotations."""
DEFAULT_CSS = """
LBandScreen {
layout: vertical;
}
LBandScreen #lband-main {
height: 1fr;
layout: horizontal;
}
LBandScreen #lband-plot-col {
width: 2fr;
layout: vertical;
}
LBandScreen #lband-info-col {
width: 1fr;
padding: 1;
background: #0e1420;
border-left: solid #1a2a3a;
layout: vertical;
}
LBandScreen #lband-alloc-panel {
height: auto;
padding: 1;
}
LBandScreen #lband-progress {
height: 3;
layout: horizontal;
padding: 0 2;
background: #0e1018;
}
LBandScreen #lband-progress Static {
width: auto;
margin: 1 1 0 0;
}
LBandScreen #lband-progress ProgressBar {
width: 1fr;
margin: 1 1 0 0;
}
LBandScreen #lband-controls {
height: auto;
padding: 1 2;
background: #0e1018;
border-top: solid #1a2a3a;
layout: horizontal;
}
LBandScreen #lband-controls Label {
width: auto;
margin: 1 1 0 0;
color: #506878;
}
LBandScreen #lband-controls Input {
width: 10;
margin: 0 1;
}
LBandScreen #lband-controls Button {
margin: 0 1;
}
"""
def __init__(self, bridge, **kwargs):
super().__init__(**kwargs)
self._bridge = bridge
self._sweeping = False
self._sweep_worker: Worker | None = None
def compose(self) -> ComposeResult:
with Horizontal(id="lband-main"):
with Vertical(id="lband-plot-col"):
yield SpectrumPlot(title="L-Band Spectrum (Direct Input)",
id="lband-plot")
yield WaterfallDisplay(title="Waterfall", id="lband-waterfall")
with Vertical(id="lband-info-col"):
yield Static(_alloc_table(950, 2150), id="lband-alloc-panel")
with Horizontal(id="lband-progress"):
yield Static("[#506878]Ready[/]", id="lband-status")
yield ProgressBar(total=100, show_eta=False, id="lband-pbar")
with Horizontal(id="lband-controls"):
yield Label("Start:")
yield Input("950", id="lband-start")
yield Label("Stop:")
yield Input("2150", id="lband-stop")
yield Label("Step:")
yield Input("2", id="lband-step")
yield Label("Dwell:")
yield Input("20", id="lband-dwell")
yield Button("23cm", id="lband-23cm-btn")
yield Button("Sweep", id="lband-sweep-btn", variant="success")
yield Button("Stop", id="lband-stop-btn", variant="error")
def on_mount(self) -> None:
if self._bridge.is_demo:
self._start_sweep()
def on_unmount(self) -> None:
self._stop_sweep()
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "lband-sweep-btn":
self._start_sweep()
elif event.button.id == "lband-stop-btn":
self._stop_sweep()
elif event.button.id == "lband-23cm-btn":
self.query_one("#lband-start", Input).value = "1240"
self.query_one("#lband-stop", Input).value = "1300"
self.query_one("#lband-step", Input).value = "0.5"
# Update allocation display
self.query_one("#lband-alloc-panel", Static).update(
_alloc_table(1240, 1300)
)
def _start_sweep(self) -> None:
if self._sweeping:
return
self._sweeping = True
start = float(self.query_one("#lband-start", Input).value or "950")
stop = float(self.query_one("#lband-stop", Input).value or "2150")
step = float(self.query_one("#lband-step", Input).value or "2")
dwell = int(self.query_one("#lband-dwell", Input).value or "20")
# Update allocation panel for current range
self.query_one("#lband-alloc-panel", Static).update(
_alloc_table(start, stop)
)
self._sweep_worker = self._do_sweep(start, stop, step, dwell)
def _stop_sweep(self) -> None:
self._sweeping = False
if self._sweep_worker:
self._sweep_worker.cancel()
self._sweep_worker = None
@work(thread=True)
def _do_sweep(self, start: float, stop: float, step: float, dwell: int) -> None:
"""L-band sweep with LNB disabled."""
try:
self._bridge.ensure_booted()
# Disable LNB for direct input
self._bridge.configure_lnb(disable_lnb=True)
except Exception:
pass
def progress_cb(freq, step_num, total, result):
pct = (step_num + 1) / total * 100
self.app.call_from_thread(self._update_progress, pct, freq)
self.app.call_from_thread(self._set_status, "Sweeping...")
freqs, powers, results = self._bridge.sweep_spectrum(
start, stop, step, dwell, sr_ksps=20000, callback=progress_cb,
)
self.app.call_from_thread(self._show_results, freqs, powers, results)
self._sweeping = False
def _update_progress(self, pct: float, freq: float) -> None:
if not self.is_mounted:
return
self.query_one("#lband-pbar", ProgressBar).update(progress=pct)
self.query_one("#lband-status", Static).update(
f"[#00d4aa]{freq:.1f} MHz[/]"
)
def _set_status(self, msg: str) -> None:
if not self.is_mounted:
return
self.query_one("#lband-status", Static).update(f"[#506878]{msg}[/]")
def _show_results(self, freqs, powers, results) -> None:
if not self.is_mounted:
return
self.query_one("#lband-plot", SpectrumPlot).update_data(
freqs, powers, results, lnb_lo=0.0,
)
self.query_one("#lband-waterfall", WaterfallDisplay).add_sweep(powers)
self.query_one("#lband-status", Static).update("[#506878]Complete[/]")
self.query_one("#lband-pbar", ProgressBar).update(progress=100)

View File

@ -0,0 +1,206 @@
"""Monitor screen — real-time signal strength at a single frequency.
This is the dish-alignment / signal-monitoring mode. It polls signal_monitor()
at a configurable rate and displays SNR, power, lock state, and a rolling
sparkline history.
"""
from textual.app import ComposeResult
from textual.containers import Horizontal, Vertical
from textual.screen import Screen
from textual.widgets import Label, Input, Button, Static
from textual import work
from textual.worker import Worker, WorkerState
from skywalker_tui.widgets.signal_gauge import SignalGauge
from skywalker_tui.widgets.sparkline_widget import SparklineWidget
class MonitorScreen(Screen):
"""Real-time signal monitor with gauge and sparkline."""
DEFAULT_CSS = """
MonitorScreen {
layout: vertical;
}
MonitorScreen #monitor-main {
height: 1fr;
layout: vertical;
padding: 1 2;
}
MonitorScreen #monitor-controls {
height: auto;
padding: 1 2;
background: #0e1018;
border-top: solid #1a2a3a;
layout: horizontal;
}
MonitorScreen #monitor-controls Label {
width: auto;
margin: 1 1 0 0;
color: #506878;
}
MonitorScreen #monitor-controls Input {
width: 14;
margin: 0 1;
}
MonitorScreen #monitor-controls Button {
margin: 0 1;
}
MonitorScreen #monitor-stats {
height: 3;
layout: horizontal;
padding: 0 2;
}
MonitorScreen #monitor-stats Static {
width: 1fr;
height: 3;
content-align: center middle;
background: #121c2a;
border: round #1a3050;
margin: 0 1 0 0;
}
"""
BINDINGS = [
("space", "toggle_poll", "Start/Stop"),
]
def __init__(self, bridge, **kwargs):
super().__init__(**kwargs)
self._bridge = bridge
self._polling = False
self._poll_worker: Worker | None = None
self._sample_count = 0
self._peak_snr = 0.0
def compose(self) -> ComposeResult:
with Vertical(id="monitor-main"):
yield SignalGauge(id="monitor-gauge")
yield SparklineWidget(title="SNR History", color="#00d4aa",
id="snr-sparkline")
yield SparklineWidget(title="Power History", color="#2196f3",
id="power-sparkline")
with Horizontal(id="monitor-stats"):
yield Static("[#506878]Samples:[/] [#00d4aa]0[/]", id="stat-samples")
yield Static("[#506878]Peak SNR:[/] [#00d4aa]0.0 dB[/]", id="stat-peak")
yield Static("[#506878]Status:[/] [#e8a020]Stopped[/]", id="stat-status")
with Horizontal(id="monitor-controls"):
yield Label("Freq (MHz):")
yield Input("1200", id="mon-freq")
yield Label("SR (ksps):")
yield Input("20000", id="mon-sr")
yield Label("Rate (Hz):")
yield Input("5", id="mon-rate")
yield Button("Start", id="mon-start", variant="success")
yield Button("Stop", id="mon-stop", variant="error")
def on_mount(self) -> None:
# Auto-start polling in demo mode
if self._bridge.is_demo:
self._start_polling()
def on_unmount(self) -> None:
self._stop_polling()
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "mon-start":
self._start_polling()
elif event.button.id == "mon-stop":
self._stop_polling()
def action_toggle_poll(self) -> None:
if self._polling:
self._stop_polling()
else:
self._start_polling()
def _start_polling(self) -> None:
if self._polling:
return
self._polling = True
self._sample_count = 0
self._peak_snr = 0.0
freq_mhz = float(self.query_one("#mon-freq", Input).value or "1200")
sr_ksps = int(self.query_one("#mon-sr", Input).value or "20000")
rate = float(self.query_one("#mon-rate", Input).value or "5")
self.query_one("#stat-status", Static).update(
"[#506878]Status:[/] [bold #00d4aa]Running[/]"
)
self._poll_worker = self._do_poll(freq_mhz, sr_ksps, rate)
def _stop_polling(self) -> None:
self._polling = False
if self._poll_worker is not None:
self._poll_worker.cancel()
self._poll_worker = None
try:
self.query_one("#stat-status", Static).update(
"[#506878]Status:[/] [#e8a020]Stopped[/]"
)
except Exception:
pass
@staticmethod
def _parse_input(input_widget: Input, default: float) -> float:
try:
return float(input_widget.value)
except (ValueError, TypeError):
return default
@work(thread=True)
def _do_poll(self, freq_mhz: float, sr_ksps: int, rate: float) -> None:
"""Background worker that polls signal_monitor() in a thread."""
import time
interval = 1.0 / max(0.5, rate)
freq_khz = int(freq_mhz * 1000)
sr_sps = sr_ksps * 1000
# Initial tune
try:
self._bridge.ensure_booted()
self._bridge.tune(sr_sps, freq_khz, 0, 5)
time.sleep(0.3)
except Exception:
pass
while self._polling:
t0 = time.monotonic()
try:
sig = self._bridge.signal_monitor()
except Exception:
time.sleep(interval)
continue
self._sample_count += 1
snr_db = sig.get("snr_db", 0.0)
self._peak_snr = max(self._peak_snr, snr_db)
# Post updates to the UI thread
self.app.call_from_thread(self._update_ui, sig)
elapsed = time.monotonic() - t0
sleep = interval - elapsed
if sleep > 0:
time.sleep(sleep)
def _update_ui(self, sig: dict) -> None:
"""Called from the main thread to update widgets."""
if not self.is_mounted:
return
self.query_one("#monitor-gauge", SignalGauge).update_signal(sig)
self.query_one("#snr-sparkline", SparklineWidget).push(sig.get("snr_db", 0))
self.query_one("#power-sparkline", SparklineWidget).push(sig.get("power_db", -40))
self.query_one("#stat-samples", Static).update(
f"[#506878]Samples:[/] [#00d4aa]{self._sample_count}[/]"
)
self.query_one("#stat-peak", Static).update(
f"[#506878]Peak SNR:[/] [#00d4aa]{self._peak_snr:.1f} dB[/]"
)

View File

@ -0,0 +1,262 @@
"""Scan screen — automated transponder discovery.
Multi-phase pipeline: coarse sweep peak detection fine sweep blind scan.
Shows progress, spectrum visualization, and a results table.
"""
import struct
import time
from textual.app import ComposeResult
from textual.containers import Horizontal, Vertical
from textual.screen import Screen
from textual.widgets import Label, Input, Button, Static, ProgressBar
from textual import work
from textual.worker import Worker
from skywalker_tui.widgets.spectrum_plot import SpectrumPlot
from skywalker_tui.widgets.frequency_table import FrequencyTable
class ScanScreen(Screen):
"""Multi-phase transponder scanner with progress and results table."""
DEFAULT_CSS = """
ScanScreen {
layout: vertical;
}
ScanScreen #scan-main {
height: 1fr;
layout: vertical;
}
ScanScreen #scan-upper {
height: 1fr;
layout: horizontal;
}
ScanScreen #scan-spectrum-col {
width: 1fr;
}
ScanScreen #scan-results-col {
width: 1fr;
}
ScanScreen #scan-progress {
height: auto;
padding: 1 2;
background: #0e1018;
layout: vertical;
}
ScanScreen #scan-progress-row {
height: 3;
layout: horizontal;
}
ScanScreen #scan-progress Static {
width: auto;
margin: 0 1 0 0;
}
ScanScreen #scan-progress ProgressBar {
width: 1fr;
margin: 0 1;
}
ScanScreen #scan-controls {
height: auto;
padding: 1 2;
background: #0e1018;
border-top: solid #1a2a3a;
layout: horizontal;
}
ScanScreen #scan-controls Label {
width: auto;
margin: 1 1 0 0;
color: #506878;
}
ScanScreen #scan-controls Input {
width: 10;
margin: 0 1;
}
ScanScreen #scan-controls Button {
margin: 0 1;
}
"""
def __init__(self, bridge, **kwargs):
super().__init__(**kwargs)
self._bridge = bridge
self._scanning = False
self._scan_worker: Worker | None = None
def compose(self) -> ComposeResult:
with Vertical(id="scan-main"):
with Horizontal(id="scan-upper"):
with Vertical(id="scan-spectrum-col"):
yield SpectrumPlot(title="Coarse Sweep", id="scan-spectrum")
with Vertical(id="scan-results-col"):
yield Static("[#00d4aa bold]Transponders Found[/]",
id="scan-results-title")
yield FrequencyTable(id="scan-table")
with Vertical(id="scan-progress"):
yield Static("[#506878]Ready[/]", id="scan-phase")
with Horizontal(id="scan-progress-row"):
yield ProgressBar(total=100, show_eta=False, id="scan-pbar")
with Horizontal(id="scan-controls"):
yield Label("Start:")
yield Input("950", id="scan-start")
yield Label("Stop:")
yield Input("2150", id="scan-stop")
yield Label("LNB LO:")
yield Input("9750", id="scan-lnb")
yield Label("Threshold:")
yield Input("3", id="scan-thresh")
yield Button("Scan", id="scan-start-btn", variant="success")
yield Button("Stop", id="scan-stop-btn", variant="error")
def on_unmount(self) -> None:
self._stop_scan()
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "scan-start-btn":
self._start_scan()
elif event.button.id == "scan-stop-btn":
self._stop_scan()
def _start_scan(self) -> None:
if self._scanning:
return
self._scanning = True
start = float(self.query_one("#scan-start", Input).value or "950")
stop = float(self.query_one("#scan-stop", Input).value or "2150")
lnb_lo = float(self.query_one("#scan-lnb", Input).value or "9750")
threshold = float(self.query_one("#scan-thresh", Input).value or "3")
# Clear previous results
self.query_one("#scan-table", FrequencyTable).clear_table()
self._scan_worker = self._do_scan(start, stop, lnb_lo, threshold)
def _stop_scan(self) -> None:
self._scanning = False
if self._scan_worker:
self._scan_worker.cancel()
self._scan_worker = None
@work(thread=True)
def _do_scan(self, start: float, stop: float, lnb_lo: float,
threshold: float) -> None:
"""Multi-phase scan pipeline in a background thread."""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..", "tools"))
from skywalker_lib import detect_peaks, if_to_rf
try:
self._bridge.ensure_booted()
except Exception:
pass
# Phase 1: Coarse sweep
self.app.call_from_thread(self._set_phase, "Phase 1: Coarse sweep", 0)
coarse_step = 10
total_steps = max(1, int((stop - start) / coarse_step) + 1)
def coarse_cb(freq, step_num, total, result):
pct = (step_num + 1) / total * 100
self.app.call_from_thread(self._set_progress, pct)
freqs, powers, results = self._bridge.sweep_spectrum(
start, stop, coarse_step, dwell_ms=15, sr_ksps=20000,
callback=coarse_cb,
)
if not self._scanning:
return
self.app.call_from_thread(self._update_spectrum, freqs, powers, results, lnb_lo)
# Phase 2: Peak detection
self.app.call_from_thread(self._set_phase, "Phase 2: Peak detection", 50)
peaks = detect_peaks(freqs, powers, threshold_db=threshold)
if not peaks:
self.app.call_from_thread(self._set_phase, "No peaks found", 100)
self._scanning = False
return
# Phase 3: Fine sweep around peaks
self.app.call_from_thread(
self._set_phase,
f"Phase 3: Fine sweep ({len(peaks)} peaks)", 60,
)
refined = []
for i, (freq, pwr, idx) in enumerate(peaks):
if not self._scanning:
return
fine_start = max(start, freq - 15)
fine_stop = min(stop, freq + 15)
fine_freqs, fine_powers, fine_results = self._bridge.sweep_spectrum(
fine_start, fine_stop, step_mhz=2.0, dwell_ms=20, sr_ksps=20000,
)
if fine_powers:
best_idx = fine_powers.index(max(fine_powers))
refined.append((
fine_freqs[best_idx], fine_powers[best_idx],
fine_results[best_idx],
))
pct = 60 + (i + 1) / len(peaks) * 20
self.app.call_from_thread(self._set_progress, pct)
# Phase 4: Blind scan
self.app.call_from_thread(
self._set_phase,
f"Phase 4: Blind scan ({len(refined)} candidates)", 80,
)
sr_min = 1000 * 1000
sr_max = 30000 * 1000
sr_step = 500 * 1000
for i, (freq, pwr, result) in enumerate(refined):
if not self._scanning:
return
freq_khz = int(freq * 1000)
bs_result = self._bridge.blind_scan(freq_khz, sr_min, sr_max, sr_step)
if bs_result and bs_result.get("locked"):
tp = {
"if_mhz": bs_result.get("freq_khz", freq_khz) / 1000.0,
"rf_mhz": if_to_rf(
bs_result.get("freq_khz", freq_khz) / 1000.0, lnb_lo
),
"sr_ksps": bs_result.get("sr_sps", 0) // 1000,
"power_db": pwr,
"locked": True,
}
self.app.call_from_thread(self._add_transponder, tp)
pct = 80 + (i + 1) / len(refined) * 20
self.app.call_from_thread(self._set_progress, pct)
self.app.call_from_thread(self._set_phase, "Scan complete", 100)
self._scanning = False
def _set_phase(self, text: str, progress: float) -> None:
if not self.is_mounted:
return
self.query_one("#scan-phase", Static).update(f"[#00d4aa]{text}[/]")
self.query_one("#scan-pbar", ProgressBar).update(progress=progress)
def _set_progress(self, pct: float) -> None:
if not self.is_mounted:
return
self.query_one("#scan-pbar", ProgressBar).update(progress=pct)
def _update_spectrum(self, freqs, powers, results, lnb_lo) -> None:
if not self.is_mounted:
return
self.query_one("#scan-spectrum", SpectrumPlot).update_data(
freqs, powers, results, lnb_lo=lnb_lo,
)
def _add_transponder(self, tp: dict) -> None:
if not self.is_mounted:
return
self.query_one("#scan-table", FrequencyTable).add_transponder(tp)

View File

@ -0,0 +1,193 @@
"""Spectrum screen — sweep analyzer across the IF range.
Displays a bar chart of power vs. frequency, optionally with a rolling
waterfall beneath it. Uses threaded workers for the blocking USB sweep.
"""
from textual.app import ComposeResult
from textual.containers import Horizontal, Vertical
from textual.screen import Screen
from textual.widgets import Label, Input, Button, Static, ProgressBar, Checkbox
from textual import work
from textual.worker import Worker
from skywalker_tui.widgets.spectrum_plot import SpectrumPlot
from skywalker_tui.widgets.waterfall import WaterfallDisplay
class SpectrumScreen(Screen):
"""Spectrum analyzer with bar chart and optional waterfall."""
DEFAULT_CSS = """
SpectrumScreen {
layout: vertical;
}
SpectrumScreen #spec-main {
height: 1fr;
layout: vertical;
}
SpectrumScreen #spec-progress-row {
height: 3;
layout: horizontal;
padding: 0 2;
background: #0e1018;
}
SpectrumScreen #spec-progress-row Static {
width: auto;
margin: 1 1 0 0;
}
SpectrumScreen #spec-progress-row ProgressBar {
width: 1fr;
margin: 1 1 0 0;
}
SpectrumScreen #spec-controls {
height: auto;
padding: 1 2;
background: #0e1018;
border-top: solid #1a2a3a;
layout: horizontal;
}
SpectrumScreen #spec-controls Label {
width: auto;
margin: 1 1 0 0;
color: #506878;
}
SpectrumScreen #spec-controls Input {
width: 10;
margin: 0 1;
}
SpectrumScreen #spec-controls Button {
margin: 0 1;
}
SpectrumScreen #spec-controls Checkbox {
margin: 1 1 0 0;
}
"""
def __init__(self, bridge, **kwargs):
super().__init__(**kwargs)
self._bridge = bridge
self._sweeping = False
self._sweep_worker: Worker | None = None
def compose(self) -> ComposeResult:
with Vertical(id="spec-main"):
yield SpectrumPlot(title="Spectrum Analyzer", id="spec-plot")
yield WaterfallDisplay(title="Waterfall", id="spec-waterfall")
with Horizontal(id="spec-progress-row"):
yield Static("[#506878]Ready[/]", id="spec-status")
yield ProgressBar(total=100, show_eta=False, id="spec-pbar")
with Horizontal(id="spec-controls"):
yield Label("Start:")
yield Input("950", id="spec-start")
yield Label("Stop:")
yield Input("2150", id="spec-stop")
yield Label("Step:")
yield Input("5", id="spec-step")
yield Label("Dwell:")
yield Input("10", id="spec-dwell")
yield Label("LNB LO:")
yield Input("0", id="spec-lnb")
yield Checkbox("Continuous", id="spec-continuous")
yield Button("Sweep", id="spec-sweep-btn", variant="success")
yield Button("Stop", id="spec-stop-btn", variant="error")
def on_mount(self) -> None:
if self._bridge.is_demo:
self._start_sweep()
def on_unmount(self) -> None:
self._stop_sweep()
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "spec-sweep-btn":
self._start_sweep()
elif event.button.id == "spec-stop-btn":
self._stop_sweep()
def _start_sweep(self) -> None:
if self._sweeping:
return
self._sweeping = True
start = float(self.query_one("#spec-start", Input).value or "950")
stop = float(self.query_one("#spec-stop", Input).value or "2150")
step = float(self.query_one("#spec-step", Input).value or "5")
dwell = int(self.query_one("#spec-dwell", Input).value or "10")
lnb_lo = float(self.query_one("#spec-lnb", Input).value or "0")
continuous = self.query_one("#spec-continuous", Checkbox).value
self._sweep_worker = self._do_sweep(start, stop, step, dwell, lnb_lo, continuous)
def _stop_sweep(self) -> None:
self._sweeping = False
if self._sweep_worker:
self._sweep_worker.cancel()
self._sweep_worker = None
@work(thread=True)
def _do_sweep(self, start: float, stop: float, step: float,
dwell: int, lnb_lo: float, continuous: bool) -> None:
"""Background sweep worker."""
import time
try:
self._bridge.ensure_booted()
except Exception:
pass
sweep_num = 0
while self._sweeping:
sweep_num += 1
total_steps = max(1, int((stop - start) / step) + 1)
step_count = [0]
def progress_cb(freq, step_num, total, result):
step_count[0] = step_num + 1
pct = (step_num + 1) / total * 100
self.app.call_from_thread(self._update_progress, pct, freq, sweep_num)
self.app.call_from_thread(self._update_status, f"Sweeping #{sweep_num}...")
freqs, powers, results = self._bridge.sweep_spectrum(
start, stop, step, dwell, sr_ksps=20000, callback=progress_cb,
)
self.app.call_from_thread(self._update_plot, freqs, powers, results, lnb_lo)
self.app.call_from_thread(self._update_waterfall, powers)
if not continuous:
break
time.sleep(0.1)
self._sweeping = False
self.app.call_from_thread(self._update_status, "Complete")
def _update_progress(self, pct: float, freq: float, sweep_num: int) -> None:
if not self.is_mounted:
return
self.query_one("#spec-pbar", ProgressBar).update(progress=pct)
self.query_one("#spec-status", Static).update(
f"[#00d4aa]Sweep #{sweep_num}[/] [#506878]{freq:.0f} MHz[/]"
)
def _update_status(self, msg: str) -> None:
if not self.is_mounted:
return
self.query_one("#spec-status", Static).update(f"[#506878]{msg}[/]")
self.query_one("#spec-pbar", ProgressBar).update(progress=0)
def _update_plot(self, freqs, powers, results, lnb_lo) -> None:
if not self.is_mounted:
return
self.query_one("#spec-plot", SpectrumPlot).update_data(
freqs, powers, results, lnb_lo=lnb_lo,
)
def _update_waterfall(self, powers) -> None:
if not self.is_mounted:
return
self.query_one("#spec-waterfall", WaterfallDisplay).add_sweep(powers)

View File

@ -0,0 +1,301 @@
"""Track screen — carrier/beacon tracker with logging and export.
Locks to a single frequency and records SNR, power, lock state over time.
Displays dual sparklines, an event log for lock transitions, and stats.
Supports CSV/JSONL export.
"""
import csv
import json
import time
from datetime import datetime
from pathlib import Path
from textual.app import ComposeResult
from textual.containers import Horizontal, Vertical
from textual.screen import Screen
from textual.widgets import Label, Input, Button, Static, RichLog
from textual import work
from textual.worker import Worker
from skywalker_tui.widgets.signal_gauge import SignalGauge
from skywalker_tui.widgets.sparkline_widget import SparklineWidget
class TrackScreen(Screen):
"""Long-running carrier tracker with event log and export."""
DEFAULT_CSS = """
TrackScreen {
layout: vertical;
}
TrackScreen #track-main {
height: 1fr;
layout: vertical;
padding: 1 2;
}
TrackScreen #track-upper {
height: auto;
layout: horizontal;
}
TrackScreen #track-gauge-col {
width: 1fr;
}
TrackScreen #track-sparklines {
width: 1fr;
layout: vertical;
}
TrackScreen #track-log-container {
height: 10;
background: #0e1018;
border: round #1a2a3a;
margin: 1 0;
}
TrackScreen #track-log-title {
height: 1;
color: #00d4aa;
text-style: bold;
padding: 0 1;
}
TrackScreen #track-log {
height: 1fr;
}
TrackScreen #track-stats {
height: 3;
layout: horizontal;
}
TrackScreen #track-stats Static {
width: 1fr;
height: 3;
content-align: center middle;
background: #121c2a;
border: round #1a3050;
margin: 0 1 0 0;
}
TrackScreen #track-controls {
height: auto;
padding: 1 2;
background: #0e1018;
border-top: solid #1a2a3a;
layout: horizontal;
}
TrackScreen #track-controls Label {
width: auto;
margin: 1 1 0 0;
color: #506878;
}
TrackScreen #track-controls Input {
width: 14;
margin: 0 1;
}
TrackScreen #track-controls Button {
margin: 0 1;
}
"""
def __init__(self, bridge, **kwargs):
super().__init__(**kwargs)
self._bridge = bridge
self._tracking = False
self._track_worker: Worker | None = None
self._sample_count = 0
self._peak_snr = 0.0
self._start_time = 0.0
self._was_locked: bool | None = None
self._records: list[dict] = []
def compose(self) -> ComposeResult:
with Vertical(id="track-main"):
with Horizontal(id="track-upper"):
with Vertical(id="track-gauge-col"):
yield SignalGauge(id="track-gauge")
with Vertical(id="track-sparklines"):
yield SparklineWidget(title="SNR (dB)", color="#00d4aa",
id="track-snr-spark")
yield SparklineWidget(title="Power (dB)", color="#2196f3",
id="track-power-spark")
with Vertical(id="track-log-container"):
yield Static("[#00d4aa bold]Event Log[/]", id="track-log-title")
yield RichLog(id="track-log", wrap=True, markup=True)
with Horizontal(id="track-stats"):
yield Static("[#506878]Samples:[/] [#00d4aa]0[/]", id="trk-samples")
yield Static("[#506878]Elapsed:[/] [#00d4aa]0s[/]", id="trk-elapsed")
yield Static("[#506878]Peak SNR:[/] [#00d4aa]0.0 dB[/]", id="trk-peak")
yield Static("[#506878]Status:[/] [#e8a020]Stopped[/]", id="trk-status")
with Horizontal(id="track-controls"):
yield Label("Freq (MHz):")
yield Input("1200", id="trk-freq")
yield Label("SR (ksps):")
yield Input("20000", id="trk-sr")
yield Label("Rate (Hz):")
yield Input("1", id="trk-rate")
yield Button("Start", id="trk-start-btn", variant="success")
yield Button("Stop", id="trk-stop-btn", variant="error")
yield Button("Export CSV", id="trk-csv-btn")
yield Button("Export JSONL", id="trk-jsonl-btn")
def on_mount(self) -> None:
if self._bridge.is_demo:
self._start_tracking()
def on_unmount(self) -> None:
self._stop_tracking()
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "trk-start-btn":
self._start_tracking()
elif event.button.id == "trk-stop-btn":
self._stop_tracking()
elif event.button.id == "trk-csv-btn":
self._export_csv()
elif event.button.id == "trk-jsonl-btn":
self._export_jsonl()
def _start_tracking(self) -> None:
if self._tracking:
return
self._tracking = True
self._sample_count = 0
self._peak_snr = 0.0
self._was_locked = None
self._records.clear()
self._start_time = time.monotonic()
freq = float(self.query_one("#trk-freq", Input).value or "1200")
sr = int(self.query_one("#trk-sr", Input).value or "20000")
rate = float(self.query_one("#trk-rate", Input).value or "1")
self.query_one("#trk-status", Static).update(
"[#506878]Status:[/] [bold #00d4aa]Tracking[/]"
)
log = self.query_one("#track-log", RichLog)
log.clear()
log.write("[#506878]Tracking started[/]")
self._track_worker = self._do_track(freq, sr, rate)
def _stop_tracking(self) -> None:
self._tracking = False
if self._track_worker:
self._track_worker.cancel()
self._track_worker = None
try:
self.query_one("#trk-status", Static).update(
"[#506878]Status:[/] [#e8a020]Stopped[/]"
)
log = self.query_one("#track-log", RichLog)
log.write(f"[#506878]Stopped. {self._sample_count} samples.[/]")
except Exception:
pass
@work(thread=True)
def _do_track(self, freq_mhz: float, sr_ksps: int, rate: float) -> None:
"""Background tracking loop."""
interval = 1.0 / max(0.1, rate)
freq_khz = int(freq_mhz * 1000)
sr_sps = sr_ksps * 1000
try:
self._bridge.ensure_booted()
self._bridge.tune(sr_sps, freq_khz, 0, 5)
time.sleep(0.3)
except Exception:
pass
while self._tracking:
t0 = time.monotonic()
try:
sig = self._bridge.signal_monitor()
except Exception:
time.sleep(interval)
continue
self._sample_count += 1
snr_db = sig.get("snr_db", 0.0)
locked = sig.get("locked", False)
self._peak_snr = max(self._peak_snr, snr_db)
elapsed = time.monotonic() - self._start_time
record = {
"ts": datetime.now().isoformat(),
"elapsed": round(elapsed, 3),
"snr_db": round(snr_db, 2),
"agc1": sig.get("agc1", 0),
"agc2": sig.get("agc2", 0),
"power_db": round(sig.get("power_db", -40), 2),
"locked": locked,
}
self._records.append(record)
# Lock transition detection
lock_event = None
if self._was_locked is not None and locked != self._was_locked:
if locked:
lock_event = ("lock", snr_db)
else:
lock_event = ("unlock", snr_db)
self._was_locked = locked
self.app.call_from_thread(
self._update_ui, sig, elapsed, lock_event,
)
sleep = interval - (time.monotonic() - t0)
if sleep > 0:
time.sleep(sleep)
def _update_ui(self, sig: dict, elapsed: float,
lock_event: tuple | None) -> None:
if not self.is_mounted:
return
self.query_one("#track-gauge", SignalGauge).update_signal(sig)
self.query_one("#track-snr-spark", SparklineWidget).push(sig.get("snr_db", 0))
self.query_one("#track-power-spark", SparklineWidget).push(sig.get("power_db", -40))
self.query_one("#trk-samples", Static).update(
f"[#506878]Samples:[/] [#00d4aa]{self._sample_count}[/]"
)
self.query_one("#trk-elapsed", Static).update(
f"[#506878]Elapsed:[/] [#00d4aa]{elapsed:.0f}s[/]"
)
self.query_one("#trk-peak", Static).update(
f"[#506878]Peak SNR:[/] [#00d4aa]{self._peak_snr:.1f} dB[/]"
)
if lock_event:
ts = datetime.now().strftime("%H:%M:%S.%f")[:-3]
log = self.query_one("#track-log", RichLog)
if lock_event[0] == "lock":
log.write(
f"[#506878]{ts}[/] [bold #00e060]LOCK ACQUIRED[/]"
f" SNR {lock_event[1]:.1f} dB"
)
else:
log.write(
f"[#506878]{ts}[/] [bold #e04040]LOCK LOST[/]"
)
def _export_csv(self) -> None:
if not self._records:
return
path = Path(f"skywalker-track-{datetime.now().strftime('%Y%m%d-%H%M%S')}.csv")
with open(path, "w", newline="") as f:
w = csv.DictWriter(f, fieldnames=list(self._records[0].keys()))
w.writeheader()
w.writerows(self._records)
log = self.query_one("#track-log", RichLog)
log.write(f"[#00d4aa]CSV exported: {path}[/]")
def _export_jsonl(self) -> None:
if not self._records:
return
path = Path(f"skywalker-track-{datetime.now().strftime('%Y%m%d-%H%M%S')}.jsonl")
with open(path, "w") as f:
for rec in self._records:
f.write(json.dumps(rec) + "\n")
log = self.query_one("#track-log", RichLog)
log.write(f"[#00d4aa]JSONL exported: {path}[/]")

View File

@ -0,0 +1,305 @@
/* SkyWalker-1 TUI — dark RF theme
*
* Design: dark background with teal/cyan data accents.
* Signal gradient: blue → green → yellow → red (cold → hot).
* No purple per user preference.
*/
/* ─── Global ─── */
Screen {
background: #0a0a12;
color: #c8d0d8;
}
Header {
background: #0e1018;
color: #00d4aa;
dock: top;
}
Footer {
background: #0e1018;
dock: bottom;
}
/* ─── Sidebar ─── */
#sidebar {
width: 26;
background: #0e1420;
border-right: solid #1a2a3a;
padding: 1 1;
}
#sidebar .mode-button {
width: 100%;
margin: 0 0 1 0;
min-height: 3;
background: #121c2a;
color: #7090a8;
border: round #1a3050;
text-align: center;
}
#sidebar .mode-button:hover {
background: #1a2a40;
color: #00d4aa;
border: round #00d4aa;
}
#sidebar .mode-button.-active {
background: #0a2a3a;
color: #00d4aa;
border: round #00d4aa;
text-style: bold;
}
#sidebar Label.sidebar-heading {
color: #506878;
text-style: bold;
margin: 1 0 0 0;
text-align: center;
}
/* ─── Content area ─── */
#content-area {
background: #0a0a12;
}
/* ─── Status bar widget ─── */
#device-status {
height: 3;
background: #0e1420;
border-top: solid #1a2a3a;
padding: 0 1;
dock: bottom;
}
#device-status .status-label {
color: #506878;
}
#device-status .status-value {
color: #00d4aa;
}
#device-status .status-connected {
color: #00d4aa;
text-style: bold;
}
#device-status .status-demo {
color: #e8a020;
text-style: bold;
}
#device-status .status-disconnected {
color: #e04040;
text-style: bold;
}
/* ─── Signal gauge ─── */
.signal-gauge {
height: auto;
padding: 1;
}
.signal-gauge .snr-value {
color: #00d4aa;
text-style: bold;
}
.signal-gauge .lock-yes {
color: #00e060;
text-style: bold;
}
.signal-gauge .lock-no {
color: #e04040;
}
/* ─── Spectrum plot ─── */
.spectrum-plot {
min-height: 12;
}
/* ─── Panels and containers ─── */
.panel {
background: #0e1420;
border: round #1a2a3a;
padding: 1;
margin: 0 0 1 0;
}
.panel-title {
color: #00d4aa;
text-style: bold;
margin: 0 0 1 0;
}
/* ─── Controls / Input areas ─── */
.controls {
height: auto;
padding: 1;
background: #0e1018;
border-top: solid #1a2a3a;
dock: bottom;
}
.controls Label {
color: #506878;
width: auto;
margin: 0 1 0 0;
}
.controls Input {
width: 14;
background: #121c2a;
border: round #1a3050;
color: #c8d0d8;
}
.controls Input:focus {
border: round #00d4aa;
}
.controls Button {
margin: 0 1;
background: #1a2a40;
color: #00d4aa;
border: round #1a3050;
}
.controls Button:hover {
background: #00d4aa;
color: #0a0a12;
}
/* ─── Data table ─── */
DataTable {
background: #0a0a12;
}
DataTable > .datatable--header {
background: #0e1420;
color: #00d4aa;
text-style: bold;
}
DataTable > .datatable--cursor {
background: #1a2a40;
color: #ffffff;
}
/* ─── Progress bar ─── */
ProgressBar Bar {
color: #00d4aa;
background: #121c2a;
}
/* ─── Sparkline ─── */
.sparkline-widget {
height: 3;
padding: 0 1;
}
/* ─── Waterfall ─── */
.waterfall {
min-height: 10;
}
/* ─── Log / event list ─── */
#event-log {
height: 8;
background: #0e1018;
border: round #1a2a3a;
padding: 0 1;
overflow-y: auto;
}
#event-log .log-lock {
color: #00e060;
}
#event-log .log-unlock {
color: #e04040;
}
#event-log .log-time {
color: #506878;
}
/* ─── Stats panel ─── */
.stats-grid {
layout: grid;
grid-size: 4;
grid-gutter: 1;
height: auto;
padding: 1;
}
.stat-box {
height: 3;
background: #121c2a;
border: round #1a3050;
padding: 0 1;
content-align: center middle;
}
.stat-box .stat-value {
color: #00d4aa;
text-style: bold;
}
.stat-box .stat-label {
color: #506878;
}
/* ─── L-band allocation ─── */
.alloc-tag {
background: #1a2a40;
color: #60a0c0;
padding: 0 1;
margin: 0 1 0 0;
}
/* ─── Mode-specific screen layouts ─── */
.mode-screen {
layout: vertical;
}
.top-panel {
height: 1fr;
min-height: 10;
}
.bottom-panel {
height: auto;
}
.split-horizontal {
layout: horizontal;
}
.left-panel {
width: 1fr;
}
.right-panel {
width: 1fr;
}

View File

@ -0,0 +1 @@
"""Custom widgets for SkyWalker-1 TUI."""

View File

@ -0,0 +1,81 @@
"""DataTable wrapper for transponder scan results."""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..", "..", "tools"))
from textual.widget import Widget
from textual.widgets import DataTable
from textual.app import ComposeResult
from skywalker_lib import LBAND_ALLOCATIONS
def _freq_allocation(freq_mhz: float) -> str:
"""Return allocation name for a given frequency."""
for lo, hi, name in LBAND_ALLOCATIONS:
if lo <= freq_mhz <= hi:
return name
return ""
class FrequencyTable(Widget):
"""Sortable table of discovered transponders or sweep results."""
DEFAULT_CSS = """
FrequencyTable {
height: 1fr;
min-height: 6;
}
"""
def __init__(self, show_allocation: bool = False, **kwargs):
super().__init__(**kwargs)
self._show_allocation = show_allocation
self._rows: list[dict] = []
def compose(self) -> ComposeResult:
table = DataTable(id="freq-table")
table.cursor_type = "row"
yield table
def on_mount(self) -> None:
table = self.query_one("#freq-table", DataTable)
cols = ["IF MHz", "RF MHz", "SR ksps", "Power dB", "Locked"]
if self._show_allocation:
cols.append("Allocation")
for col in cols:
table.add_column(col, key=col)
def add_transponder(self, tp: dict) -> None:
"""Add a single transponder result."""
self._rows.append(tp)
table = self.query_one("#freq-table", DataTable)
if_mhz = tp.get("if_mhz", 0)
rf_mhz = tp.get("rf_mhz", 0)
sr_ksps = tp.get("sr_ksps", 0)
power_db = tp.get("power_db", 0)
locked = "Yes" if tp.get("locked", False) else "No"
row = [f"{if_mhz:.1f}", f"{rf_mhz:.0f}", str(sr_ksps),
f"{power_db:.1f}", locked]
if self._show_allocation:
row.append(_freq_allocation(if_mhz))
table.add_row(*row)
def clear_table(self) -> None:
"""Remove all rows."""
self._rows.clear()
table = self.query_one("#freq-table", DataTable)
table.clear()
def get_selected_transponder(self) -> dict | None:
"""Return the currently selected transponder dict."""
table = self.query_one("#freq-table", DataTable)
cursor_row = table.cursor_row
if cursor_row is not None and 0 <= cursor_row < len(self._rows):
return self._rows[cursor_row]
return None

View File

@ -0,0 +1,120 @@
"""Large signal strength gauge with SNR bar and lock indicator."""
from textual.app import ComposeResult
from textual.widget import Widget
from textual.widgets import Label, Static
from textual.reactive import reactive
# Bar characters for sub-block resolution
_BARS = " ▏▎▍▌▋▊▉█"
def _snr_color(snr_db: float) -> str:
"""Map SNR to a hex color: blue → cyan → green → yellow → red."""
if snr_db < 2:
return "#1565c0"
elif snr_db < 4:
return "#0097a7"
elif snr_db < 6:
return "#00bfa5"
elif snr_db < 8:
return "#00d4aa"
elif snr_db < 10:
return "#4caf50"
elif snr_db < 12:
return "#8bc34a"
elif snr_db < 14:
return "#cddc39"
elif snr_db < 16:
return "#ffc107"
else:
return "#f44336"
def _build_bar(pct: float, width: int = 40) -> str:
"""Build a Unicode block bar string with sub-character precision."""
pct = max(0.0, min(100.0, pct))
ratio = pct / 100.0
full = int(ratio * width)
remainder = (ratio * width) - full
partial = int(remainder * (len(_BARS) - 1))
bar = "" * full
if full < width:
bar += _BARS[partial]
bar += " " * (width - full - 1)
return bar
class SignalGauge(Widget):
"""Large signal strength display with SNR, power, and lock state."""
DEFAULT_CSS = """
SignalGauge {
height: auto;
padding: 1 2;
background: #0e1420;
border: round #1a2a3a;
margin: 0 0 1 0;
}
SignalGauge #gauge-header {
height: 1;
margin: 0 0 1 0;
}
SignalGauge #gauge-bar-line {
height: 1;
}
SignalGauge #gauge-details {
height: 1;
margin: 1 0 0 0;
color: #506878;
}
"""
snr_db = reactive(0.0)
snr_pct = reactive(0.0)
power_db = reactive(-40.0)
locked = reactive(False)
agc1 = reactive(0)
def compose(self) -> ComposeResult:
yield Static("", id="gauge-header")
yield Static("", id="gauge-bar-line")
yield Static("", id="gauge-details")
def watch_snr_db(self) -> None:
self._refresh_display()
def watch_locked(self) -> None:
self._refresh_display()
def update_signal(self, sig: dict) -> None:
"""Update from a signal_monitor() result dict."""
self.snr_db = sig.get("snr_db", 0.0)
self.snr_pct = sig.get("snr_pct", 0.0)
self.power_db = sig.get("power_db", -40.0)
self.locked = sig.get("locked", False)
self.agc1 = sig.get("agc1", 0)
def _refresh_display(self) -> None:
if not self.is_mounted:
return
color = _snr_color(self.snr_db)
lock_str = "[bold #00e060]LOCK[/]" if self.locked else "[#e04040]NO LOCK[/]"
header = self.query_one("#gauge-header", Static)
header.update(
f" {lock_str} "
f"[bold {color}]{self.snr_db:6.1f} dB[/] "
f"[#506878]{self.snr_pct:5.1f}%[/]"
)
bar_str = _build_bar(self.snr_pct, width=50)
bar_line = self.query_one("#gauge-bar-line", Static)
bar_line.update(f" [{color}]{bar_str}[/]")
details = self.query_one("#gauge-details", Static)
details.update(
f" Power: {self.power_db:6.1f} dB AGC: {self.agc1:5d}"
)

View File

@ -0,0 +1,71 @@
"""Rolling sparkline time series widget using Unicode spark characters."""
from collections import deque
from textual.widget import Widget
from textual.widgets import Static
from textual.app import ComposeResult
_SPARKS = "▁▂▃▄▅▆▇█"
class SparklineWidget(Widget):
"""Rolling time-series display using Unicode block characters."""
DEFAULT_CSS = """
SparklineWidget {
height: 3;
padding: 0 1;
background: #0e1420;
border: round #1a2a3a;
margin: 0 0 1 0;
}
SparklineWidget #spark-label {
height: 1;
color: #506878;
}
SparklineWidget #spark-line {
height: 1;
}
"""
def __init__(self, title: str = "History", max_width: int = 80,
color: str = "#00d4aa", **kwargs):
super().__init__(**kwargs)
self._title = title
self._max_width = max_width
self._color = color
self._values: deque[float] = deque(maxlen=max_width)
def compose(self) -> ComposeResult:
yield Static(f"[#506878]{self._title}[/]", id="spark-label")
yield Static("", id="spark-line")
def push(self, value: float) -> None:
"""Add a new data point and refresh the display."""
self._values.append(value)
self._refresh()
def clear(self) -> None:
self._values.clear()
if self.is_mounted:
self.query_one("#spark-line", Static).update("")
def _refresh(self) -> None:
if not self.is_mounted or not self._values:
return
vals = list(self._values)
mn = min(vals)
mx = max(vals)
rng = mx - mn if mx != mn else 1.0
chars = []
for v in vals:
idx = int((v - mn) / rng * (len(_SPARKS) - 1))
idx = max(0, min(len(_SPARKS) - 1, idx))
chars.append(_SPARKS[idx])
spark_str = "".join(chars)
line = self.query_one("#spark-line", Static)
line.update(f"[{self._color}]{spark_str}[/] [{mn:.1f} .. {mx:.1f}]")

View File

@ -0,0 +1,155 @@
"""Terminal-native spectrum plot using Unicode block characters and Rich markup.
Renders a horizontal bar chart where each frequency bin gets a colored bar
proportional to its power level. The color gradient goes from cold (blue)
to hot (red), same concept as the CLI tool's WATERFALL_COLORS but using
Rich style strings instead of raw ANSI escapes.
"""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..", "..", "tools"))
from textual.widget import Widget
from textual.widgets import Static
from textual.app import ComposeResult
from textual.containers import VerticalScroll
from skywalker_lib import detect_peaks, if_to_rf
# Sub-block characters for fractional bar width
_BARS = " ▏▎▍▌▋▊▉█"
# Power-to-color gradient (16 steps: blue → cyan → green → yellow → red)
_POWER_COLORS = [
"#1a237e", # dark blue (weakest)
"#1565c0",
"#0277bd",
"#00838f",
"#00897b", # teal
"#2e7d32", # green
"#558b2f",
"#9e9d24",
"#f9a825", # yellow
"#ff8f00",
"#ef6c00", # orange
"#e65100",
"#d84315",
"#c62828", # red
"#b71c1c",
"#880e0e", # dark red (strongest)
]
def _power_to_color(power_db: float, floor: float, ceiling: float) -> str:
"""Map a power value to a color from the gradient."""
if ceiling == floor:
return _POWER_COLORS[len(_POWER_COLORS) // 2]
ratio = (power_db - floor) / (ceiling - floor)
ratio = max(0.0, min(1.0, ratio))
idx = int(ratio * (len(_POWER_COLORS) - 1))
return _POWER_COLORS[idx]
class SpectrumPlot(Widget):
"""Bar-chart spectrum display rendered with Unicode blocks and Rich styles."""
DEFAULT_CSS = """
SpectrumPlot {
height: 1fr;
min-height: 12;
background: #0a0a12;
padding: 0 1;
}
SpectrumPlot #spectrum-title {
height: 1;
color: #00d4aa;
text-style: bold;
}
SpectrumPlot #spectrum-body {
height: 1fr;
}
"""
def __init__(self, title: str = "Spectrum", bar_width: int = 40,
lnb_lo: float = 0.0, **kwargs):
super().__init__(**kwargs)
self._title = title
self._bar_width = bar_width
self._lnb_lo = lnb_lo
self._freqs: list[float] = []
self._powers: list[float] = []
self._results: list[dict] = []
def compose(self) -> ComposeResult:
yield Static(f"[#00d4aa bold]{self._title}[/]", id="spectrum-title")
yield VerticalScroll(Static("", id="spectrum-lines"), id="spectrum-body")
def update_data(self, freqs: list[float], powers: list[float],
results: list[dict] | None = None, lnb_lo: float | None = None):
"""Update with new sweep data and redraw."""
self._freqs = freqs
self._powers = powers
self._results = results or [{} for _ in freqs]
if lnb_lo is not None:
self._lnb_lo = lnb_lo
self._refresh()
def _refresh(self) -> None:
if not self.is_mounted or not self._freqs:
return
p_min = min(self._powers)
p_max = max(self._powers)
p_range = p_max - p_min if p_max != p_min else 1.0
# Detect peaks for markers
peaks_set = set()
peaks = detect_peaks(self._freqs, self._powers, threshold_db=3.0)
for _f, _p, idx in peaks:
peaks_set.add(idx)
lines = []
for i, (f, p) in enumerate(zip(self._freqs, self._powers)):
# Frequency label (RF or IF)
if self._lnb_lo > 0:
label = f"{if_to_rf(f, self._lnb_lo):7.0f}"
else:
label = f"{f:7.1f}"
# Bar
ratio = max(0.0, min(1.0, (p - p_min) / p_range))
full = int(ratio * self._bar_width)
remainder = (ratio * self._bar_width) - full
partial = int(remainder * (len(_BARS) - 1))
color = _power_to_color(p, p_min, p_max)
bar = "" * full
if full < self._bar_width:
bar += _BARS[partial]
bar += " " * (self._bar_width - full - 1)
locked = self._results[i].get("locked", False) if i < len(self._results) else False
lock_mark = " [bold #00e060]*[/]" if locked else ""
peak_mark = " [bold #f44336]^[/]" if i in peaks_set else ""
lines.append(
f"[#506878]{label}[/] [{color}]{bar}[/] [#7090a8]{p:6.1f}[/]{lock_mark}{peak_mark}"
)
# Peak summary at bottom
if peaks:
lines.append("")
lines.append(f"[#00d4aa bold]Peaks ({len(peaks)}):[/]")
for freq, pwr, idx in peaks:
if self._lnb_lo > 0:
fl = f"{if_to_rf(freq, self._lnb_lo):.0f} MHz RF"
else:
fl = f"{freq:.1f} MHz"
locked = self._results[idx].get("locked", False) if idx < len(self._results) else False
lock_s = " [bold #00e060]LOCKED[/]" if locked else ""
lines.append(f" [#c8d0d8]{fl} {pwr:.1f} dB{lock_s}[/]")
body = self.query_one("#spectrum-lines", Static)
body.update("\n".join(lines))

View File

@ -0,0 +1,71 @@
"""Device status bar — connection state, firmware version, config bits."""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..", "..", "tools"))
from textual.app import ComposeResult
from textual.containers import Horizontal
from textual.widget import Widget
from textual.widgets import Label
from skywalker_lib import format_config_bits
class DeviceStatusBar(Widget):
"""Bottom status bar showing device connection and configuration."""
DEFAULT_CSS = """
DeviceStatusBar {
height: 3;
background: #0e1420;
border-top: solid #1a2a3a;
padding: 0 1;
dock: bottom;
layout: horizontal;
}
DeviceStatusBar Label {
width: auto;
margin: 1 2 0 0;
}
"""
def __init__(self, bridge=None):
super().__init__(id="device-status")
self._bridge = bridge
def compose(self) -> ComposeResult:
yield Label("", id="status-conn")
yield Label("", id="status-fw")
yield Label("", id="status-config")
def update_status(self, bridge=None):
if bridge is not None:
self._bridge = bridge
if self._bridge is None:
return
conn_label = self.query_one("#status-conn", Label)
fw_label = self.query_one("#status-fw", Label)
config_label = self.query_one("#status-config", Label)
if self._bridge.is_demo:
conn_label.update("[bold #e8a020]DEMO[/]")
else:
conn_label.update("[bold #00d4aa]CONNECTED[/]")
try:
fw = self._bridge.get_fw_version()
fw_label.update(f"[#506878]FW:[/] [#c8d0d8]{fw['version']}[/]")
except Exception:
fw_label.update("[#506878]FW:[/] [#e04040]error[/]")
try:
config = self._bridge.get_config()
bits = format_config_bits(config)
active = [name for name, is_set in bits if is_set]
config_str = " | ".join(active) if active else "idle"
config_label.update(f"[#506878]Config:[/] [#7090a8]{config_str}[/]")
except Exception:
config_label.update("[#506878]Config:[/] [#e04040]error[/]")

View File

@ -0,0 +1,98 @@
"""Rolling waterfall display — each row is one sweep, color = power level.
The waterfall auto-scrolls: new sweeps appear at the top, older rows shift down.
Uses Rich markup with the same 16-color power gradient as the spectrum plot.
"""
from collections import deque
from datetime import datetime
from textual.widget import Widget
from textual.widgets import Static
from textual.app import ComposeResult
from textual.containers import VerticalScroll
# Same gradient as spectrum_plot
_WATERFALL_COLORS = [
"#1a237e", "#1565c0", "#0277bd", "#00838f",
"#00897b", "#2e7d32", "#558b2f", "#9e9d24",
"#f9a825", "#ff8f00", "#ef6c00", "#e65100",
"#d84315", "#c62828", "#b71c1c", "#880e0e",
]
class WaterfallDisplay(Widget):
"""Scrolling waterfall of spectrum sweeps."""
DEFAULT_CSS = """
WaterfallDisplay {
height: 1fr;
min-height: 8;
background: #0a0a12;
padding: 0 1;
}
WaterfallDisplay #waterfall-title {
height: 1;
color: #00d4aa;
text-style: bold;
}
WaterfallDisplay #waterfall-body {
height: 1fr;
}
"""
def __init__(self, title: str = "Waterfall", max_rows: int = 50, **kwargs):
super().__init__(**kwargs)
self._title = title
self._max_rows = max_rows
self._rows: deque[tuple[str, list[float]]] = deque(maxlen=max_rows)
self._global_min: float = -40.0
self._global_max: float = 0.0
def compose(self) -> ComposeResult:
yield Static(f"[#00d4aa bold]{self._title}[/]", id="waterfall-title")
yield VerticalScroll(Static("", id="waterfall-lines"), id="waterfall-body")
def add_sweep(self, powers: list[float]) -> None:
"""Add a new sweep row and refresh."""
ts = datetime.now().strftime("%H:%M:%S")
self._rows.appendleft((ts, list(powers)))
# Update global range for consistent coloring
if powers:
self._global_min = min(self._global_min, min(powers))
self._global_max = max(self._global_max, max(powers))
self._refresh()
def clear(self) -> None:
self._rows.clear()
self._global_min = -40.0
self._global_max = 0.0
if self.is_mounted:
self.query_one("#waterfall-lines", Static).update("")
def _refresh(self) -> None:
if not self.is_mounted or not self._rows:
return
rng = self._global_max - self._global_min
if rng == 0:
rng = 1.0
lines = []
for ts, powers in self._rows:
chars = []
for p in powers:
ratio = (p - self._global_min) / rng
ratio = max(0.0, min(1.0, ratio))
idx = int(ratio * (len(_WATERFALL_COLORS) - 1))
color = _WATERFALL_COLORS[idx]
chars.append(f"[{color}]█[/]")
line = "".join(chars)
lines.append(f"[#506878]{ts}[/] {line}")
body = self.query_one("#waterfall-lines", Static)
body.update("\n".join(lines))

143
tui/uv.lock generated Normal file
View File

@ -0,0 +1,143 @@
version = 1
revision = 3
requires-python = ">=3.11"
[[package]]
name = "linkify-it-py"
version = "2.0.3"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "uc-micro-py" },
]
sdist = { url = "https://files.pythonhosted.org/packages/2a/ae/bb56c6828e4797ba5a4821eec7c43b8bf40f69cda4d4f5f8c8a2810ec96a/linkify-it-py-2.0.3.tar.gz", hash = "sha256:68cda27e162e9215c17d786649d1da0021a451bdc436ef9e0fa0ba5234b9b048", size = 27946, upload-time = "2024-02-04T14:48:04.179Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/04/1e/b832de447dee8b582cac175871d2f6c3d5077cc56d5575cadba1fd1cccfa/linkify_it_py-2.0.3-py3-none-any.whl", hash = "sha256:6bcbc417b0ac14323382aef5c5192c0075bf8a9d6b41820a2b66371eac6b6d79", size = 19820, upload-time = "2024-02-04T14:48:02.496Z" },
]
[[package]]
name = "markdown-it-py"
version = "4.0.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "mdurl" },
]
sdist = { url = "https://files.pythonhosted.org/packages/5b/f5/4ec618ed16cc4f8fb3b701563655a69816155e79e24a17b651541804721d/markdown_it_py-4.0.0.tar.gz", hash = "sha256:cb0a2b4aa34f932c007117b194e945bd74e0ec24133ceb5bac59009cda1cb9f3", size = 73070, upload-time = "2025-08-11T12:57:52.854Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/94/54/e7d793b573f298e1c9013b8c4dade17d481164aa517d1d7148619c2cedbf/markdown_it_py-4.0.0-py3-none-any.whl", hash = "sha256:87327c59b172c5011896038353a81343b6754500a08cd7a4973bb48c6d578147", size = 87321, upload-time = "2025-08-11T12:57:51.923Z" },
]
[package.optional-dependencies]
linkify = [
{ name = "linkify-it-py" },
]
[[package]]
name = "mdit-py-plugins"
version = "0.5.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "markdown-it-py" },
]
sdist = { url = "https://files.pythonhosted.org/packages/b2/fd/a756d36c0bfba5f6e39a1cdbdbfdd448dc02692467d83816dff4592a1ebc/mdit_py_plugins-0.5.0.tar.gz", hash = "sha256:f4918cb50119f50446560513a8e311d574ff6aaed72606ddae6d35716fe809c6", size = 44655, upload-time = "2025-08-11T07:25:49.083Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/fb/86/dd6e5db36df29e76c7a7699123569a4a18c1623ce68d826ed96c62643cae/mdit_py_plugins-0.5.0-py3-none-any.whl", hash = "sha256:07a08422fc1936a5d26d146759e9155ea466e842f5ab2f7d2266dd084c8dab1f", size = 57205, upload-time = "2025-08-11T07:25:47.597Z" },
]
[[package]]
name = "mdurl"
version = "0.1.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/d6/54/cfe61301667036ec958cb99bd3efefba235e65cdeb9c84d24a8293ba1d90/mdurl-0.1.2.tar.gz", hash = "sha256:bb413d29f5eea38f31dd4754dd7377d4465116fb207585f97bf925588687c1ba", size = 8729, upload-time = "2022-08-14T12:40:10.846Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/b3/38/89ba8ad64ae25be8de66a6d463314cf1eb366222074cfda9ee839c56a4b4/mdurl-0.1.2-py3-none-any.whl", hash = "sha256:84008a41e51615a49fc9966191ff91509e3c40b939176e643fd50a5c2196b8f8", size = 9979, upload-time = "2022-08-14T12:40:09.779Z" },
]
[[package]]
name = "platformdirs"
version = "4.7.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/71/25/ccd8e88fcd16a4eb6343a8b4b9635e6f3928a7ebcd82822a14d20e3ca29f/platformdirs-4.7.0.tar.gz", hash = "sha256:fd1a5f8599c85d49b9ac7d6e450bc2f1aaf4a23f1fe86d09952fe20ad365cf36", size = 23118, upload-time = "2026-02-12T22:21:53.764Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/cb/e3/1eddccb2c39ecfbe09b3add42a04abcc3fa5b468aa4224998ffb8a7e9c8f/platformdirs-4.7.0-py3-none-any.whl", hash = "sha256:1ed8db354e344c5bb6039cd727f096af975194b508e37177719d562b2b540ee6", size = 18983, upload-time = "2026-02-12T22:21:52.237Z" },
]
[[package]]
name = "pygments"
version = "2.19.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/b0/77/a5b8c569bf593b0140bde72ea885a803b82086995367bf2037de0159d924/pygments-2.19.2.tar.gz", hash = "sha256:636cb2477cec7f8952536970bc533bc43743542f70392ae026374600add5b887", size = 4968631, upload-time = "2025-06-21T13:39:12.283Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c7/21/705964c7812476f378728bdf590ca4b771ec72385c533964653c68e86bdc/pygments-2.19.2-py3-none-any.whl", hash = "sha256:86540386c03d588bb81d44bc3928634ff26449851e99741617ecb9037ee5ec0b", size = 1225217, upload-time = "2025-06-21T13:39:07.939Z" },
]
[[package]]
name = "pyusb"
version = "1.3.1"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/00/6b/ce3727395e52b7b76dfcf0c665e37d223b680b9becc60710d4bc08b7b7cb/pyusb-1.3.1.tar.gz", hash = "sha256:3af070b607467c1c164f49d5b0caabe8ac78dbed9298d703a8dbf9df4052d17e", size = 77281, upload-time = "2025-01-08T23:45:01.866Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/28/b8/27e6312e86408a44fe16bd28ee12dd98608b39f7e7e57884a24e8f29b573/pyusb-1.3.1-py3-none-any.whl", hash = "sha256:bf9b754557af4717fe80c2b07cc2b923a9151f5c08d17bdb5345dac09d6a0430", size = 58465, upload-time = "2025-01-08T23:45:00.029Z" },
]
[[package]]
name = "rich"
version = "14.3.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "markdown-it-py" },
{ name = "pygments" },
]
sdist = { url = "https://files.pythonhosted.org/packages/74/99/a4cab2acbb884f80e558b0771e97e21e939c5dfb460f488d19df485e8298/rich-14.3.2.tar.gz", hash = "sha256:e712f11c1a562a11843306f5ed999475f09ac31ffb64281f73ab29ffdda8b3b8", size = 230143, upload-time = "2026-02-01T16:20:47.908Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ef/45/615f5babd880b4bd7d405cc0dc348234c5ffb6ed1ea33e152ede08b2072d/rich-14.3.2-py3-none-any.whl", hash = "sha256:08e67c3e90884651da3239ea668222d19bea7b589149d8014a21c633420dbb69", size = 309963, upload-time = "2026-02-01T16:20:46.078Z" },
]
[[package]]
name = "skywalker-tui"
version = "0.1.0"
source = { editable = "." }
dependencies = [
{ name = "pyusb" },
{ name = "textual" },
]
[package.metadata]
requires-dist = [
{ name = "pyusb", specifier = ">=1.3" },
{ name = "textual", specifier = ">=3.0" },
]
[[package]]
name = "textual"
version = "7.5.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "markdown-it-py", extra = ["linkify"] },
{ name = "mdit-py-plugins" },
{ name = "platformdirs" },
{ name = "pygments" },
{ name = "rich" },
{ name = "typing-extensions" },
]
sdist = { url = "https://files.pythonhosted.org/packages/9f/38/7d169a765993efde5095c70a668bf4f5831bb7ac099e932f2783e9b71abf/textual-7.5.0.tar.gz", hash = "sha256:c730cba1e3d704e8f1ca915b6a3af01451e3bca380114baacf6abf87e9dac8b6", size = 1592319, upload-time = "2026-01-30T13:46:39.881Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/9c/78/96ddb99933e11d91bc6e05edae23d2687e44213066bcbaca338898c73c47/textual-7.5.0-py3-none-any.whl", hash = "sha256:849dfee9d705eab3b2d07b33152b7bd74fb1f5056e002873cc448bce500c6374", size = 718164, upload-time = "2026-01-30T13:46:37.635Z" },
]
[[package]]
name = "typing-extensions"
version = "4.15.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/72/94/1a15dd82efb362ac84269196e94cf00f187f7ed21c242792a923cdb1c61f/typing_extensions-4.15.0.tar.gz", hash = "sha256:0cea48d173cc12fa28ecabc3b837ea3cf6f38c6d1136f85cbaaf598984861466", size = 109391, upload-time = "2025-08-25T13:49:26.313Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/18/67/36e9267722cc04a6b9f15c7f3441c2363321a3ea07da7ae0c0707beb2a9c/typing_extensions-4.15.0-py3-none-any.whl", hash = "sha256:f0fa19c6845758ab08074a0cfa8b7aecb71c999ca73d62883bc25cc018c4e548", size = 44614, upload-time = "2025-08-25T13:49:24.86Z" },
]
[[package]]
name = "uc-micro-py"
version = "1.0.3"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/91/7a/146a99696aee0609e3712f2b44c6274566bc368dfe8375191278045186b8/uc-micro-py-1.0.3.tar.gz", hash = "sha256:d321b92cff673ec58027c04015fcaa8bb1e005478643ff4a500882eaab88c48a", size = 6043, upload-time = "2024-02-09T16:52:01.654Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/37/87/1f677586e8ac487e29672e4b17455758fce261de06a0d086167bb760361a/uc_micro_py-1.0.3-py3-none-any.whl", hash = "sha256:db1dffff340817673d7b466ec86114a9dc0e9d4d9b5ba229d9d60e5c12600cd5", size = 6229, upload-time = "2024-02-09T16:52:00.371Z" },
]