From 567bf4d9e04e8111beda99e7bb0d578a16f0e83f Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Sat, 14 Feb 2026 16:08:58 -0700 Subject: [PATCH] Add Device, Stream, and Config screens to TUI (F6-F8) Expand from 5 to 8 mode screens. F6 Device provides firmware management, EEPROM flash with full safety state machine (C2 validation, auto-backup, 3s countdown, page write, byte verify), and diagnostics (boot test, I2C scan, register dump). F7 Stream does live TS capture with PID distribution and PAT/PMT tree. F8 Config manages LNB power, DiSEqC switching, and modulation/FEC. Foundation: 12 new SkyWalker1 methods (device info, FX2 RAM, EEPROM I2C, diagnostics), matching DemoDevice synthetics with realistic C2 image and TS packets, 20 bridge wrappers (RLock). --- tools/skywalker_lib.py | 117 ++ tui/src/skywalker_tui/app.py | 17 +- tui/src/skywalker_tui/bridge.py | 110 +- tui/src/skywalker_tui/demo.py | 223 +++ tui/src/skywalker_tui/screens/config.py | 742 ++++++++++ tui/src/skywalker_tui/screens/device.py | 1236 +++++++++++++++++ tui/src/skywalker_tui/screens/stream.py | 401 ++++++ tui/src/skywalker_tui/theme.tcss | 30 + tui/src/skywalker_tui/widgets/config_bits.py | 46 + .../skywalker_tui/widgets/countdown_timer.py | 112 ++ tui/src/skywalker_tui/widgets/hex_view.py | 90 ++ tui/src/skywalker_tui/widgets/pid_table.py | 74 + tui/src/skywalker_tui/widgets/psi_tree.py | 111 ++ 13 files changed, 3304 insertions(+), 5 deletions(-) create mode 100644 tui/src/skywalker_tui/screens/config.py create mode 100644 tui/src/skywalker_tui/screens/device.py create mode 100644 tui/src/skywalker_tui/screens/stream.py create mode 100644 tui/src/skywalker_tui/widgets/config_bits.py create mode 100644 tui/src/skywalker_tui/widgets/countdown_timer.py create mode 100644 tui/src/skywalker_tui/widgets/hex_view.py create mode 100644 tui/src/skywalker_tui/widgets/pid_table.py create mode 100644 tui/src/skywalker_tui/widgets/psi_tree.py diff --git a/tools/skywalker_lib.py b/tools/skywalker_lib.py index 05c926d..f524ab4 100644 --- a/tools/skywalker_lib.py +++ b/tools/skywalker_lib.py @@ -480,6 +480,123 @@ class SkyWalker1: index=count, length=count) return bytes(data) + # -- Device info (extended) -- + + def get_serial_number(self) -> bytes: + """Read 8-byte serial number from device.""" + return self._vendor_in(CMD_GET_SERIAL_NUMBER, length=8) + + def get_usb_speed(self) -> int: + """Read USB connection speed. 0=unknown, 1=Full, 2=High.""" + data = self._vendor_in(0x07, length=1) + return data[0] + + def get_vendor_string(self) -> str: + """Read vendor string descriptor from FX2.""" + data = self._vendor_in(0x0C, length=64) + return bytes(data).rstrip(b'\x00').decode('ascii', errors='replace') + + def get_product_string(self) -> str: + """Read product string descriptor from FX2.""" + data = self._vendor_in(0x0D, length=64) + return bytes(data).rstrip(b'\x00').decode('ascii', errors='replace') + + # -- FX2 RAM access (standard Cypress A0 vendor request) -- + + def fx2_ram_read(self, addr: int, length: int) -> bytes: + """Read FX2 internal RAM via A0 vendor request. Non-destructive.""" + length = max(1, min(64, length)) + data = self.dev.ctrl_transfer( + usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_IN, + 0xA0, addr, 0, length, 2000 + ) + if self.verbose: + raw = bytes(data).hex(' ') + print(f" RAM IN addr=0x{addr:04X} len={length} -> {raw}") + return bytes(data) + + def fx2_ram_write(self, addr: int, data: bytes) -> int: + """Write FX2 internal RAM via A0 vendor request. Reverts on power cycle.""" + if self.verbose: + raw = data.hex(' ') + print(f" RAM OUT addr=0x{addr:04X} len={len(data)} data={raw}") + return self.dev.ctrl_transfer( + usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_OUT, + 0xA0, addr, 0, data, 2000 + ) + + def fx2_cpu_halt(self) -> None: + """Halt FX2 CPU by writing 1 to CPUCS register (0xE600).""" + self.fx2_ram_write(0xE600, b'\x01') + + def fx2_cpu_start(self) -> None: + """Release FX2 CPU by writing 0 to CPUCS register (0xE600).""" + self.fx2_ram_write(0xE600, b'\x00') + + # -- EEPROM access (via I2C proxy commands) -- + + EEPROM_SLAVE = 0x51 + EEPROM_PAGE_SIZE = 16 + EEPROM_WRITE_CYCLE_MS = 10 + + def eeprom_read(self, offset: int, length: int = 64) -> bytes: + """Read from boot EEPROM at given offset via I2C.""" + return self._vendor_in(CMD_I2C_READ, value=self.EEPROM_SLAVE, + index=offset, length=length) + + def eeprom_write_page(self, offset: int, data: bytes) -> int: + """Write a page (up to 16 bytes) to EEPROM. Caller handles alignment.""" + return self._vendor_out(CMD_I2C_WRITE, value=self.EEPROM_SLAVE, + index=offset, data=data) + + def eeprom_read_all(self, size: int = 16384) -> bytes: + """Read entire EEPROM contents up to size bytes.""" + chunk_size = 64 + result = bytearray() + for offset in range(0, size, chunk_size): + remaining = min(chunk_size, size - offset) + chunk = self.eeprom_read(offset, remaining) + result.extend(chunk) + return bytes(result) + + # -- Diagnostics -- + + def boot_debug(self, mode: int) -> dict: + """ + Run boot diagnostic with specified mode byte. + + Modes: 0x80=no-op, 0x81=GPIO init, 0x82=I2C probe, + 0x83=BCM4500 reset, 0x84=FW load, 0x85=full boot. + Returns 3-byte status: {stage, result, detail}. + """ + data = self._vendor_in(CMD_BOOT_8PSK, value=mode, length=3) + return { + "stage": data[0], + "result": data[1], + "detail": data[2], + } + + def i2c_bus_scan(self) -> list[int]: + """ + Scan I2C bus for responding devices. + + Returns list of 7-bit slave addresses that ACK'd. + The firmware returns a 16-byte bitmap (128 bits for addresses 0-127). + """ + data = self._vendor_in(CMD_I2C_BUS_SCAN, length=16) + addresses = [] + for byte_idx in range(16): + for bit_idx in range(8): + if data[byte_idx] & (1 << bit_idx): + addresses.append(byte_idx * 8 + bit_idx) + return addresses + + def i2c_raw_read(self, slave: int, reg: int) -> int: + """Read a single register from an I2C device.""" + data = self._vendor_in(CMD_I2C_RAW_READ, value=slave, + index=reg, length=1) + return data[0] + # -- High-level sweep helpers -- def sweep_spectrum(self, start_mhz: float, stop_mhz: float, diff --git a/tui/src/skywalker_tui/app.py b/tui/src/skywalker_tui/app.py index b77ac85..2ea1dd3 100644 --- a/tui/src/skywalker_tui/app.py +++ b/tui/src/skywalker_tui/app.py @@ -1,9 +1,9 @@ """SkyWalker-1 TUI — main application. -Provides mode switching between 5 RF operating modes via a sidebar and F-key -shortcuts. Each mode is a Screen subclass that manages its own workers. +Provides mode switching between 8 operating modes via a sidebar and F-key +shortcuts. Each mode is a Container subclass that manages its own workers. -Note: We use "rf_mode" terminology for our 5 operating modes to avoid colliding +Note: We use "rf_mode" terminology for our operating modes to avoid colliding with Textual's built-in App.mode / _current_mode / _screen_stacks system. """ @@ -24,6 +24,9 @@ from skywalker_tui.screens.scan import ScanScreen from skywalker_tui.screens.monitor import MonitorScreen from skywalker_tui.screens.lband import LBandScreen from skywalker_tui.screens.track import TrackScreen +from skywalker_tui.screens.device import DeviceScreen +from skywalker_tui.screens.stream import StreamScreen +from skywalker_tui.screens.config import ConfigScreen MODES = { @@ -32,6 +35,9 @@ MODES = { "monitor": ("F3 Monitor", MonitorScreen), "lband": ("F4 L-Band", LBandScreen), "track": ("F5 Track", TrackScreen), + "device": ("F6 Device", DeviceScreen), + "stream": ("F7 Stream", StreamScreen), + "config": ("F8 Config", ConfigScreen), } @@ -48,6 +54,9 @@ class SkyWalkerApp(App): Binding("f3", "rf_mode('monitor')", "Monitor", show=True), Binding("f4", "rf_mode('lband')", "L-Band", show=True), Binding("f5", "rf_mode('track')", "Track", show=True), + Binding("f6", "rf_mode('device')", "Device", show=True), + Binding("f7", "rf_mode('stream')", "Stream", show=True), + Binding("f8", "rf_mode('config')", "Config", show=True), Binding("q", "quit", "Quit", show=True), Binding("d", "toggle_dark", "Theme", show=True), Binding("ctrl+w", "starwars", "Star Wars", show=False), @@ -100,7 +109,7 @@ class SkyWalkerApp(App): self.call_later(self._init_mode_screens) def _init_mode_screens(self) -> None: - """Mount all 5 mode screens into the content switcher.""" + """Mount all 8 mode screens into the content switcher.""" switcher = self.query_one("#content-area", ContentSwitcher) for mode_key, (_label, cls) in MODES.items(): screen = cls(self._bridge, id=f"screen-{mode_key}") diff --git a/tui/src/skywalker_tui/bridge.py b/tui/src/skywalker_tui/bridge.py index 8a0f78d..d53cf6c 100644 --- a/tui/src/skywalker_tui/bridge.py +++ b/tui/src/skywalker_tui/bridge.py @@ -17,7 +17,7 @@ class USBBridge: def __init__(self, device): self._dev = device - self._lock = threading.Lock() + self._lock = threading.RLock() @property def is_demo(self) -> bool: @@ -92,3 +92,111 @@ class USBBridge: if hasattr(self._dev, "blind_scan"): return self._dev.blind_scan(freq_khz, sr_min, sr_max, sr_step) return None + + # -- Device info (extended) -- + + def get_serial_number(self) -> bytes: + with self._lock: + return self._dev.get_serial_number() + + def get_usb_speed(self) -> int: + with self._lock: + return self._dev.get_usb_speed() + + def get_vendor_string(self) -> str: + with self._lock: + return self._dev.get_vendor_string() + + def get_product_string(self) -> str: + with self._lock: + return self._dev.get_product_string() + + # -- FX2 RAM -- + + def fx2_ram_read(self, addr: int, length: int) -> bytes: + with self._lock: + return self._dev.fx2_ram_read(addr, length) + + def fx2_ram_write(self, addr: int, data: bytes) -> int: + with self._lock: + return self._dev.fx2_ram_write(addr, data) + + def fx2_cpu_halt(self) -> None: + with self._lock: + self._dev.fx2_cpu_halt() + + def fx2_cpu_start(self) -> None: + with self._lock: + self._dev.fx2_cpu_start() + + # -- EEPROM -- + + def eeprom_read(self, offset: int, length: int = 64) -> bytes: + with self._lock: + return self._dev.eeprom_read(offset, length) + + def eeprom_write_page(self, offset: int, data: bytes) -> int: + with self._lock: + return self._dev.eeprom_write_page(offset, data) + + def eeprom_read_all(self, size: int = 16384) -> bytes: + with self._lock: + return self._dev.eeprom_read_all(size) + + # -- Diagnostics -- + + def boot_debug(self, mode: int) -> dict: + with self._lock: + return self._dev.boot_debug(mode) + + def i2c_bus_scan(self) -> list[int]: + with self._lock: + return self._dev.i2c_bus_scan() + + def i2c_raw_read(self, slave: int, reg: int) -> int: + with self._lock: + return self._dev.i2c_raw_read(slave, reg) + + # -- Streaming -- + + def arm_transfer(self, on: bool) -> None: + with self._lock: + self._dev.arm_transfer(on) + + def read_stream(self, size: int = 8192, timeout: int = 1000) -> bytes: + with self._lock: + return self._dev.read_stream(size, timeout) + + # -- Config -- + + def send_diseqc_message(self, msg: bytes) -> None: + with self._lock: + self._dev.send_diseqc_message(msg) + + def send_diseqc_tone_burst(self, mini_cmd: int) -> None: + with self._lock: + self._dev.send_diseqc_tone_burst(mini_cmd) + + def start_intersil(self, on: bool = True) -> int: + with self._lock: + return self._dev.start_intersil(on) + + def set_extra_voltage(self, on: bool) -> None: + with self._lock: + self._dev.set_extra_voltage(on) + + def boot(self, on: bool = True) -> int: + with self._lock: + return self._dev.boot(on) + + def get_signal_lock(self) -> bool: + with self._lock: + return self._dev.get_signal_lock() + + def get_signal_strength(self) -> dict: + with self._lock: + return self._dev.get_signal_strength() + + def multi_reg_read(self, start_reg: int, count: int) -> bytes: + with self._lock: + return self._dev.multi_reg_read(start_reg, count) diff --git a/tui/src/skywalker_tui/demo.py b/tui/src/skywalker_tui/demo.py index cd71a1a..734a681 100644 --- a/tui/src/skywalker_tui/demo.py +++ b/tui/src/skywalker_tui/demo.py @@ -13,6 +13,7 @@ This enables full TUI development and testing without hardware. import math import random +import struct import time @@ -28,6 +29,31 @@ _TRANSPONDERS = [ _NOISE_FLOOR = -35.0 _LOCK_THRESHOLD_DB = 3.5 +# Simulated PIDs for synthetic TS packets +_DEMO_PIDS = [0x0000, 0x0100, 0x0101, 0x0102, 0x1FFF] +_DEMO_PID_WEIGHTS = [1, 5, 40, 20, 34] # rough % distribution + + +def _build_demo_eeprom() -> bytearray: + """Build a valid C2 boot EEPROM image for demo mode.""" + img = bytearray() + # C2 header: magic, VID_L, VID_H, PID_L, PID_H, DID_L, DID_H, CONFIG + img.append(0xC2) + img.extend(struct.pack('HH', code_len, 0x0000)) + img.extend(bytes(range(256)) * 2) # repeating pattern + + # END marker: 0x8001 + entry point 0x0000 + img.extend(struct.pack('>HH', 0x8001, 0x0000)) + + # Pad to 16KB with 0xFF (like a real blank EEPROM region) + img.extend(b'\xFF' * (16384 - len(img))) + return img + class DemoDevice: """Drop-in replacement for SkyWalker1 that generates synthetic data.""" @@ -39,10 +65,14 @@ class DemoDevice: self._lnb_on = False self._lnb_voltage_high = False self._tone_22khz = False + self._extra_voltage = False self._tuned_freq_khz = 0 self._tuned_sr_sps = 0 + self._armed = False self._start_time = time.monotonic() self._sample_count = 0 + self._eeprom = _build_demo_eeprom() + self._cc_counters: dict[int, int] = {pid: 0 for pid in _DEMO_PIDS} def open(self): pass @@ -109,6 +139,9 @@ class DemoDevice: def start_intersil(self, on: bool = True): self._lnb_on = on + def set_extra_voltage(self, on: bool): + self._extra_voltage = on + def tune(self, symbol_rate_sps: int, freq_khz: int, mod_index: int, fec_index: int): self._tuned_freq_khz = freq_khz @@ -211,6 +244,196 @@ class DemoDevice: } return None + # --- Device info (extended) --- + + def get_serial_number(self) -> bytes: + time.sleep(0.005) + return b'DEMO0001' + + def get_usb_speed(self) -> int: + time.sleep(0.002) + return 2 # High speed + + def get_vendor_string(self) -> str: + time.sleep(0.005) + return "Genpix Electronics" + + def get_product_string(self) -> str: + time.sleep(0.005) + return "SkyWalker-1 DVB-S" + + # --- FX2 RAM access --- + + def fx2_ram_read(self, addr: int, length: int) -> bytes: + time.sleep(0.003) + # Return pseudo-random but deterministic bytes based on address + rng = random.Random(addr) + return bytes(rng.randint(0, 255) for _ in range(length)) + + def fx2_ram_write(self, addr: int, data: bytes) -> int: + time.sleep(0.003) + return len(data) + + def fx2_cpu_halt(self) -> None: + time.sleep(0.005) + + def fx2_cpu_start(self) -> None: + time.sleep(0.005) + + # --- EEPROM access --- + + def eeprom_read(self, offset: int, length: int = 64) -> bytes: + time.sleep(0.005) + end = min(offset + length, len(self._eeprom)) + return bytes(self._eeprom[offset:end]) + + def eeprom_write_page(self, offset: int, data: bytes) -> int: + time.sleep(0.012) # simulated write cycle + for i, b in enumerate(data): + if offset + i < len(self._eeprom): + self._eeprom[offset + i] = b + return len(data) + + def eeprom_read_all(self, size: int = 16384) -> bytes: + chunk_size = 64 + result = bytearray() + for offset in range(0, size, chunk_size): + remaining = min(chunk_size, size - offset) + result.extend(self.eeprom_read(offset, remaining)) + return bytes(result) + + # --- Diagnostics --- + + _BOOT_STAGES = { + 0x80: {"stage": 0x00, "result": 0x00, "detail": 0x00}, # no-op + 0x81: {"stage": 0x01, "result": 0x01, "detail": 0x00}, # GPIO OK + 0x82: {"stage": 0x02, "result": 0x01, "detail": 0x02}, # I2C probe OK + 0x83: {"stage": 0x03, "result": 0x01, "detail": 0x00}, # BCM4500 reset OK + 0x84: {"stage": 0x04, "result": 0x01, "detail": 0x00}, # FW load OK + 0x85: {"stage": 0x05, "result": 0x01, "detail": 0x00}, # full boot OK + } + + def boot_debug(self, mode: int) -> dict: + time.sleep(0.05) + return dict(self._BOOT_STAGES.get(mode, { + "stage": mode & 0x0F, "result": 0x00, "detail": 0xFF, + })) + + def i2c_bus_scan(self) -> list[int]: + time.sleep(0.1) + return [0x08, 0x51] # BCM4500 at 0x08, EEPROM at 0x51 + + def i2c_raw_read(self, slave: int, reg: int) -> int: + time.sleep(0.01) + return random.randint(0, 255) + + # --- Streaming --- + + def arm_transfer(self, on: bool) -> None: + self._armed = on + time.sleep(0.01) + + def read_stream(self, size: int = 8192, timeout: int = 1000) -> bytes: + """Generate synthetic TS packets with proper structure.""" + if not self._armed: + return b'' + time.sleep(0.02) + + packets = bytearray() + num_packets = size // 188 + + for _ in range(num_packets): + # Pick a PID based on weights + pid = random.choices(_DEMO_PIDS, weights=_DEMO_PID_WEIGHTS, k=1)[0] + cc = self._cc_counters[pid] + self._cc_counters[pid] = (cc + 1) & 0x0F + + # Occasionally inject a CC error (~0.1%) + if random.random() < 0.001 and pid != 0x1FFF: + self._cc_counters[pid] = (self._cc_counters[pid] + 1) & 0x0F + + # Build 188-byte TS packet + pkt = bytearray(188) + pkt[0] = 0x47 # sync byte + pkt[1] = (pid >> 8) & 0x1F + pkt[2] = pid & 0xFF + pkt[3] = 0x10 | (cc & 0x0F) # payload only + CC + + if pid == 0x0000: + # PAT packet with PUSI + pkt[1] |= 0x40 # PUSI + pkt[4] = 0x00 # pointer field + # PAT section: table_id=0x00, one program + pat = bytearray([ + 0x00, # table_id + 0xB0, 0x0D, # section_syntax + length=13 + 0x00, 0x01, # transport_stream_id + 0xC1, # version=0, current + 0x00, 0x00, # section_number, last_section + 0x00, 0x01, # program_number=1 + 0xE1, 0x00, # PMT PID=0x0100 + # CRC32 placeholder + 0x00, 0x00, 0x00, 0x00, + ]) + pkt[5:5 + len(pat)] = pat + + elif pid == 0x0100: + # PMT packet with PUSI + pkt[1] |= 0x40 + pkt[4] = 0x00 + pmt = bytearray([ + 0x02, # table_id + 0xB0, 0x17, # section_syntax + length=23 + 0x00, 0x01, # program_number=1 + 0xC1, # version=0 + 0x00, 0x00, # section_number + 0xE1, 0x01, # PCR PID=0x0101 + 0xF0, 0x00, # program info length=0 + # Stream 1: MPEG-2 Video on PID 0x0101 + 0x02, 0xE1, 0x01, 0xF0, 0x00, + # Stream 2: MPEG-1 Audio on PID 0x0102 + 0x03, 0xE1, 0x02, 0xF0, 0x00, + # CRC32 placeholder + 0x00, 0x00, 0x00, 0x00, + ]) + pkt[5:5 + len(pmt)] = pmt + + else: + # Fill payload with pseudo-random data + for i in range(4, 188): + pkt[i] = random.randint(0, 255) + pkt[3] = 0x10 | (cc & 0x0F) + + packets.extend(pkt) + + return bytes(packets) + + # --- DiSEqC --- + + def send_diseqc_tone_burst(self, mini_cmd: int) -> None: + time.sleep(0.05) + + def send_diseqc_message(self, msg: bytes) -> None: + time.sleep(0.05) + + def get_signal_lock(self) -> bool: + sig = self.signal_monitor() + return sig["locked"] + + def get_signal_strength(self) -> dict: + sig = self.signal_monitor() + return { + "snr_raw": sig["snr_raw"], + "snr_db": sig["snr_db"], + "snr_pct": sig["snr_pct"], + "raw_bytes": "00 00 00 00 00 00", + } + + def multi_reg_read(self, start_reg: int, count: int) -> bytes: + time.sleep(0.01) + rng = random.Random(start_reg) + return bytes(rng.randint(0, 255) for _ in range(count)) + # --- Internal signal model --- def _power_at(self, freq_mhz: float, elapsed: float) -> float: diff --git a/tui/src/skywalker_tui/screens/config.py b/tui/src/skywalker_tui/screens/config.py new file mode 100644 index 0000000..dbddb14 --- /dev/null +++ b/tui/src/skywalker_tui/screens/config.py @@ -0,0 +1,742 @@ +"""Config screen — LNB power, DiSEqC switching, and modulation/FEC setup. + +Manages the hardware configuration layer: LNB voltage/tone control, DiSEqC +port switching (committed commands, tone burst, raw hex), and the tuning +parameter set (modulation, FEC, symbol rate, frequency). Bottom status bar +shows live config register state from the device. +""" + +from textual.app import ComposeResult +from textual.containers import Container, Horizontal, Vertical +from textual.widgets import Label, Input, Button, Static, Select +from textual import work +from textual.worker import Worker + +from skywalker_lib import MODULATIONS, FEC_RATES, MOD_FEC_GROUP, format_config_bits + + +def _fec_options_for_mod(mod_key: str) -> list[tuple[str, str]]: + """Return (label, value) tuples for the FEC Select dropdown.""" + group = MOD_FEC_GROUP.get(mod_key, "dvbs") + rates = FEC_RATES.get(group, {}) + return [(rate_name, rate_name) for rate_name in rates] + + +def _mod_options() -> list[tuple[str, str]]: + """Return (label, value) tuples for the Modulation Select dropdown.""" + return [(desc, key) for key, (_idx, desc) in MODULATIONS.items()] + + +class ConfigScreen(Container): + """LNB, DiSEqC, and modulation/FEC configuration panel.""" + + DEFAULT_CSS = """ + ConfigScreen { + layout: vertical; + } + ConfigScreen #cfg-main { + height: 1fr; + layout: horizontal; + padding: 1 2; + } + + /* --- Three column panels --- */ + + ConfigScreen .cfg-panel { + width: 1fr; + background: #0e1420; + border: round #1a2a3a; + padding: 1 2; + margin: 0 1 0 0; + layout: vertical; + } + ConfigScreen .cfg-panel:last-of-type { + margin: 0; + } + ConfigScreen .cfg-panel-title { + color: #00d4aa; + text-style: bold; + margin: 0 0 1 0; + height: 1; + } + + /* --- Buttons within panels --- */ + + ConfigScreen .cfg-btn-row { + height: auto; + layout: horizontal; + margin: 0 0 1 0; + } + ConfigScreen .cfg-btn-row Label { + width: auto; + margin: 1 1 0 0; + color: #506878; + min-width: 12; + } + ConfigScreen .cfg-btn-row Button { + margin: 0 1 0 0; + background: #1a2a40; + color: #c8d0d8; + border: round #1a3050; + } + ConfigScreen .cfg-btn-row Button:hover { + background: #00d4aa; + color: #0a0a12; + } + ConfigScreen .cfg-btn-row Button.-active-setting { + background: #0a3a3a; + color: #00d4aa; + border: round #00d4aa; + text-style: bold; + } + + /* --- Warning label --- */ + + ConfigScreen .cfg-warning { + color: #e8a020; + margin: 1 0 0 0; + text-style: italic; + height: auto; + } + + /* --- DiSEqC port buttons --- */ + + ConfigScreen #cfg-diseqc-ports { + height: auto; + layout: horizontal; + margin: 0 0 1 0; + } + ConfigScreen #cfg-diseqc-ports Button { + margin: 0 1 0 0; + min-width: 10; + background: #1a2a40; + color: #c8d0d8; + border: round #1a3050; + } + ConfigScreen #cfg-diseqc-ports Button:hover { + background: #00d4aa; + color: #0a0a12; + } + ConfigScreen #cfg-diseqc-ports Button.-active-setting { + background: #0a3a3a; + color: #00d4aa; + border: round #00d4aa; + text-style: bold; + } + + /* --- Raw DiSEqC input row --- */ + + ConfigScreen #cfg-diseqc-raw { + height: auto; + layout: horizontal; + margin: 1 0 0 0; + } + ConfigScreen #cfg-diseqc-raw Label { + width: auto; + margin: 1 1 0 0; + color: #506878; + } + ConfigScreen #cfg-diseqc-raw Input { + width: 1fr; + margin: 0 1 0 0; + background: #121c2a; + border: round #1a3050; + color: #c8d0d8; + } + ConfigScreen #cfg-diseqc-raw Input:focus { + border: round #00d4aa; + } + ConfigScreen #cfg-diseqc-raw Button { + margin: 0; + background: #1a2a40; + color: #c8d0d8; + border: round #1a3050; + } + + /* --- Modulation/FEC panel --- */ + + ConfigScreen .cfg-field-row { + height: auto; + layout: horizontal; + margin: 0 0 1 0; + } + ConfigScreen .cfg-field-row Label { + width: auto; + min-width: 12; + margin: 1 1 0 0; + color: #506878; + } + ConfigScreen .cfg-field-row Select { + width: 1fr; + } + ConfigScreen .cfg-field-row Input { + width: 1fr; + background: #121c2a; + border: round #1a3050; + color: #c8d0d8; + } + ConfigScreen .cfg-field-row Input:focus { + border: round #00d4aa; + } + ConfigScreen #cfg-tune-btn { + margin: 1 0 0 0; + width: 100%; + } + + /* --- Bottom status bar --- */ + + ConfigScreen #cfg-status-bar { + height: auto; + min-height: 3; + padding: 1 2; + background: #0e1018; + border-top: solid #1a2a3a; + } + ConfigScreen #cfg-result { + height: auto; + min-height: 2; + padding: 0 2; + background: #0e1018; + } + """ + + def __init__(self, bridge, **kwargs): + super().__init__(**kwargs) + self._bridge = bridge + self._refresh_worker: Worker | None = None + self._active_port: int | None = None + self._lnb_power = False + self._lnb_voltage_high = False + self._tone_22khz = False + self._extra_volt = False + + def compose(self) -> ComposeResult: + with Horizontal(id="cfg-main"): + # --- LNB Control --- + with Vertical(classes="cfg-panel"): + yield Static("[#00d4aa bold]LNB Control[/]", classes="cfg-panel-title") + + with Horizontal(classes="cfg-btn-row"): + yield Label("Power:") + yield Button("On", id="cfg-lnb-on", variant="success") + yield Button("Off", id="cfg-lnb-off", variant="error") + + with Horizontal(classes="cfg-btn-row"): + yield Label("Voltage:") + yield Button("13V", id="cfg-volt-13") + yield Button("18V", id="cfg-volt-18") + + with Horizontal(classes="cfg-btn-row"): + yield Label("22kHz Tone:") + yield Button("On", id="cfg-tone-on") + yield Button("Off", id="cfg-tone-off") + + with Horizontal(classes="cfg-btn-row"): + yield Label("Extra +1V:") + yield Button("On", id="cfg-extra-on") + yield Button("Off", id="cfg-extra-off") + + yield Static( + "[#e8a020]Max 450mA continuous load[/]", + classes="cfg-warning", + ) + + # --- DiSEqC Switch --- + with Vertical(classes="cfg-panel"): + yield Static("[#00d4aa bold]DiSEqC Switch[/]", classes="cfg-panel-title") + + yield Label("Committed Port:", classes="cfg-section-label") + with Horizontal(id="cfg-diseqc-ports"): + yield Button("Port 1", id="cfg-port-1") + yield Button("Port 2", id="cfg-port-2") + yield Button("Port 3", id="cfg-port-3") + yield Button("Port 4", id="cfg-port-4") + + with Horizontal(classes="cfg-btn-row"): + yield Label("Tone Burst:") + yield Button("A", id="cfg-burst-a") + yield Button("B", id="cfg-burst-b") + + with Horizontal(id="cfg-diseqc-raw"): + yield Label("Raw Hex:") + yield Input( + placeholder="E0 10 38 F0", + id="cfg-diseqc-hex", + ) + yield Button("Send", id="cfg-diseqc-send") + + # --- Modulation / FEC --- + with Vertical(classes="cfg-panel"): + yield Static( + "[#00d4aa bold]Modulation / FEC[/]", + classes="cfg-panel-title", + ) + + with Horizontal(classes="cfg-field-row"): + yield Label("Modulation:") + yield Select( + _mod_options(), + value="qpsk", + id="cfg-mod-select", + allow_blank=False, + ) + + with Horizontal(classes="cfg-field-row"): + yield Label("FEC Rate:") + yield Select( + _fec_options_for_mod("qpsk"), + value="auto", + id="cfg-fec-select", + allow_blank=False, + ) + + with Horizontal(classes="cfg-field-row"): + yield Label("Symbol Rate:") + yield Input("20000", id="cfg-sr-input") + yield Label("ksps") + + with Horizontal(classes="cfg-field-row"): + yield Label("Frequency:") + yield Input("1200", id="cfg-freq-input") + yield Label("MHz") + + yield Button( + "Tune", + id="cfg-tune-btn", + variant="success", + ) + + yield Static("", id="cfg-result") + yield Static("[#506878]Config: reading...[/]", id="cfg-status-bar") + + # ── Lifecycle ── + + def on_show(self) -> None: + self._refresh_config() + + def on_hide(self) -> None: + if self._refresh_worker is not None: + self._refresh_worker.cancel() + self._refresh_worker = None + + # ── Event handlers ── + + def on_button_pressed(self, event: Button.Pressed) -> None: + btn = event.button.id + if btn is None: + return + + # LNB control + if btn == "cfg-lnb-on": + self._do_lnb_power(True) + elif btn == "cfg-lnb-off": + self._do_lnb_power(False) + elif btn == "cfg-volt-13": + self._do_lnb_voltage(high=False) + elif btn == "cfg-volt-18": + self._do_lnb_voltage(high=True) + elif btn == "cfg-tone-on": + self._do_22khz_tone(on=True) + elif btn == "cfg-tone-off": + self._do_22khz_tone(on=False) + elif btn == "cfg-extra-on": + self._do_extra_voltage(on=True) + elif btn == "cfg-extra-off": + self._do_extra_voltage(on=False) + + # DiSEqC ports + elif btn == "cfg-port-1": + self._do_diseqc_port(1) + elif btn == "cfg-port-2": + self._do_diseqc_port(2) + elif btn == "cfg-port-3": + self._do_diseqc_port(3) + elif btn == "cfg-port-4": + self._do_diseqc_port(4) + + # Tone burst + elif btn == "cfg-burst-a": + self._do_tone_burst(0) + elif btn == "cfg-burst-b": + self._do_tone_burst(1) + + # Raw DiSEqC + elif btn == "cfg-diseqc-send": + self._do_diseqc_raw() + + # Tune + elif btn == "cfg-tune-btn": + self._do_tune() + + def on_select_changed(self, event: Select.Changed) -> None: + if event.select.id == "cfg-mod-select": + mod_key = str(event.value) + self._update_fec_options(mod_key) + + # ── LNB operations ── + + @work(thread=True) + def _do_lnb_power(self, on: bool) -> None: + try: + self._bridge.start_intersil(on) + self._lnb_power = on + label = "[#00d4aa]ON[/]" if on else "[#e04040]OFF[/]" + self.app.call_from_thread( + self._show_result, + f"[#c8d0d8]LNB power:[/] {label}", + ) + self.app.call_from_thread(self._highlight_lnb_power, on) + except Exception as e: + self.app.call_from_thread( + self._show_result, + f"[#e04040]LNB power error: {e}[/]", + ) + self.app.call_from_thread(self._refresh_config) + + @work(thread=True) + def _do_lnb_voltage(self, high: bool) -> None: + try: + self._bridge.set_lnb_voltage(high) + self._lnb_voltage_high = high + volts = "18V" if high else "13V" + self.app.call_from_thread( + self._show_result, + f"[#c8d0d8]LNB voltage set to[/] [#00d4aa]{volts}[/]", + ) + self.app.call_from_thread(self._highlight_voltage, high) + except Exception as e: + self.app.call_from_thread( + self._show_result, + f"[#e04040]Voltage error: {e}[/]", + ) + self.app.call_from_thread(self._refresh_config) + + @work(thread=True) + def _do_22khz_tone(self, on: bool) -> None: + try: + self._bridge.set_22khz_tone(on) + self._tone_22khz = on + label = "[#00d4aa]ON[/]" if on else "[#e04040]OFF[/]" + self.app.call_from_thread( + self._show_result, + f"[#c8d0d8]22kHz tone:[/] {label}", + ) + self.app.call_from_thread(self._highlight_tone, on) + except Exception as e: + self.app.call_from_thread( + self._show_result, + f"[#e04040]22kHz tone error: {e}[/]", + ) + self.app.call_from_thread(self._refresh_config) + + @work(thread=True) + def _do_extra_voltage(self, on: bool) -> None: + try: + self._bridge.set_extra_voltage(on) + self._extra_volt = on + label = "[#00d4aa]ON[/]" if on else "[#e04040]OFF[/]" + self.app.call_from_thread( + self._show_result, + f"[#c8d0d8]Extra +1V:[/] {label}", + ) + self.app.call_from_thread(self._highlight_extra, on) + except Exception as e: + self.app.call_from_thread( + self._show_result, + f"[#e04040]Extra voltage error: {e}[/]", + ) + self.app.call_from_thread(self._refresh_config) + + # ── DiSEqC operations ── + + _DISEQC_PORT_CMDS = { + 1: bytes([0xE0, 0x10, 0x38, 0xF0]), + 2: bytes([0xE0, 0x10, 0x38, 0xF4]), + 3: bytes([0xE0, 0x10, 0x38, 0xF8]), + 4: bytes([0xE0, 0x10, 0x38, 0xFC]), + } + + @work(thread=True) + def _do_diseqc_port(self, port: int) -> None: + cmd = self._DISEQC_PORT_CMDS[port] + try: + self._bridge.send_diseqc_message(cmd) + self._active_port = port + self.app.call_from_thread( + self._show_result, + f"[#c8d0d8]DiSEqC port[/] [#00d4aa]{port}[/]" + f" [#506878]({cmd.hex(' ')})[/]", + ) + self.app.call_from_thread(self._highlight_port, port) + except Exception as e: + self.app.call_from_thread( + self._show_result, + f"[#e04040]DiSEqC port {port} error: {e}[/]", + ) + + @work(thread=True) + def _do_tone_burst(self, mini_cmd: int) -> None: + label = "A" if mini_cmd == 0 else "B" + try: + self._bridge.send_diseqc_tone_burst(mini_cmd) + self.app.call_from_thread( + self._show_result, + f"[#c8d0d8]Tone burst[/] [#00d4aa]{label}[/]", + ) + except Exception as e: + self.app.call_from_thread( + self._show_result, + f"[#e04040]Tone burst error: {e}[/]", + ) + + @work(thread=True) + def _do_diseqc_raw(self) -> None: + try: + hex_input = self.app.call_from_thread(self._get_diseqc_hex) + except Exception: + return + + if not hex_input: + self.app.call_from_thread( + self._show_result, + "[#e8a020]Enter hex bytes (e.g. E0 10 38 F0)[/]", + ) + return + + try: + raw = bytes.fromhex(hex_input.replace(",", " ")) + except ValueError: + self.app.call_from_thread( + self._show_result, + f"[#e04040]Invalid hex: {hex_input}[/]", + ) + return + + if len(raw) < 3 or len(raw) > 6: + self.app.call_from_thread( + self._show_result, + f"[#e04040]DiSEqC message must be 3-6 bytes, got {len(raw)}[/]", + ) + return + + try: + self._bridge.send_diseqc_message(raw) + self.app.call_from_thread( + self._show_result, + f"[#c8d0d8]DiSEqC sent:[/] [#00d4aa]{raw.hex(' ')}[/]", + ) + except Exception as e: + self.app.call_from_thread( + self._show_result, + f"[#e04040]DiSEqC send error: {e}[/]", + ) + + def _get_diseqc_hex(self) -> str: + """Read the raw hex input value (must be called from UI thread).""" + return self.query_one("#cfg-diseqc-hex", Input).value.strip() + + # ── Tune operation ── + + @work(thread=True) + def _do_tune(self) -> None: + # Read inputs from UI thread + try: + params = self.app.call_from_thread(self._read_tune_params) + except Exception: + return + + if params is None: + return + + mod_key, fec_key, sr_ksps, freq_mhz = params + + # Validate + if not (256 <= sr_ksps <= 30000): + self.app.call_from_thread( + self._show_result, + "[#e04040]Symbol rate out of range (256-30000 ksps)[/]", + ) + return + + if not (950 <= freq_mhz <= 2150): + self.app.call_from_thread( + self._show_result, + "[#e04040]Frequency out of range (950-2150 MHz)[/]", + ) + return + + mod_index = MODULATIONS[mod_key][0] + fec_group = MOD_FEC_GROUP.get(mod_key, "dvbs") + fec_rates = FEC_RATES.get(fec_group, {}) + fec_index = fec_rates.get(fec_key, 0) + + sr_sps = sr_ksps * 1000 + freq_khz = int(freq_mhz * 1000) + + try: + self._bridge.ensure_booted() + self._bridge.tune(sr_sps, freq_khz, mod_index, fec_index) + mod_desc = MODULATIONS[mod_key][1] + self.app.call_from_thread( + self._show_result, + f"[#00d4aa]Tuned:[/] [#c8d0d8]{freq_mhz:.1f} MHz " + f"{sr_ksps} ksps {mod_desc} FEC {fec_key}[/]", + ) + except Exception as e: + self.app.call_from_thread( + self._show_result, + f"[#e04040]Tune error: {e}[/]", + ) + self.app.call_from_thread(self._refresh_config) + + def _read_tune_params(self) -> tuple | None: + """Read tune parameters from UI widgets (must be called from UI thread).""" + mod_select = self.query_one("#cfg-mod-select", Select) + fec_select = self.query_one("#cfg-fec-select", Select) + sr_input = self.query_one("#cfg-sr-input", Input) + freq_input = self.query_one("#cfg-freq-input", Input) + + mod_key = str(mod_select.value) if mod_select.value is not None else "qpsk" + fec_key = str(fec_select.value) if fec_select.value is not None else "auto" + + try: + sr_ksps = int(float(sr_input.value or "20000")) + except ValueError: + self._show_result("[#e04040]Invalid symbol rate[/]") + return None + + try: + freq_mhz = float(freq_input.value or "1200") + except ValueError: + self._show_result("[#e04040]Invalid frequency[/]") + return None + + return (mod_key, fec_key, sr_ksps, freq_mhz) + + # ── FEC dropdown update ── + + def _update_fec_options(self, mod_key: str) -> None: + """Rebuild the FEC dropdown when modulation changes.""" + fec_select = self.query_one("#cfg-fec-select", Select) + options = _fec_options_for_mod(mod_key) + fec_select.set_options(options) + # Default to "auto" if available, otherwise first option + auto_keys = [v for (_l, v) in options if v == "auto"] + if auto_keys: + fec_select.value = "auto" + elif options: + fec_select.value = options[0][1] + + # ── Config status display ── + + @work(thread=True) + def _refresh_config(self) -> None: + """Read config register and update the status bar.""" + try: + status = self._bridge.get_config() + bits = format_config_bits(status) + self.app.call_from_thread(self._display_config, status, bits) + except Exception as e: + self.app.call_from_thread( + self._display_config_error, str(e), + ) + + def _display_config(self, status: int, bits: list) -> None: + """Render config bits in the status bar (UI thread).""" + if not self.is_mounted: + return + + parts = [] + for name, is_set in bits: + if is_set: + parts.append(f"[#00d4aa]{name}[/]") + else: + parts.append(f"[#303840]{name}[/]") + + text = ( + f"[#506878]Config 0x{status:02X}:[/] " + + " ".join(parts) + ) + self.query_one("#cfg-status-bar", Static).update(text) + + # Sync button highlights from config bits + self._lnb_power = bool(status & 0x04) + self._lnb_voltage_high = bool(status & 0x20) + self._tone_22khz = bool(status & 0x10) + + self._highlight_lnb_power(self._lnb_power) + self._highlight_voltage(self._lnb_voltage_high) + self._highlight_tone(self._tone_22khz) + + def _display_config_error(self, error: str) -> None: + if not self.is_mounted: + return + self.query_one("#cfg-status-bar", Static).update( + f"[#e04040]Config read error: {error}[/]" + ) + + # ── UI highlight helpers ── + + def _show_result(self, markup: str) -> None: + """Display operation result text.""" + if not self.is_mounted: + return + self.query_one("#cfg-result", Static).update(markup) + + def _highlight_lnb_power(self, on: bool) -> None: + if not self.is_mounted: + return + btn_on = self.query_one("#cfg-lnb-on", Button) + btn_off = self.query_one("#cfg-lnb-off", Button) + if on: + btn_on.add_class("-active-setting") + btn_off.remove_class("-active-setting") + else: + btn_off.add_class("-active-setting") + btn_on.remove_class("-active-setting") + + def _highlight_voltage(self, high: bool) -> None: + if not self.is_mounted: + return + btn_13 = self.query_one("#cfg-volt-13", Button) + btn_18 = self.query_one("#cfg-volt-18", Button) + if high: + btn_18.add_class("-active-setting") + btn_13.remove_class("-active-setting") + else: + btn_13.add_class("-active-setting") + btn_18.remove_class("-active-setting") + + def _highlight_tone(self, on: bool) -> None: + if not self.is_mounted: + return + btn_on = self.query_one("#cfg-tone-on", Button) + btn_off = self.query_one("#cfg-tone-off", Button) + if on: + btn_on.add_class("-active-setting") + btn_off.remove_class("-active-setting") + else: + btn_off.add_class("-active-setting") + btn_on.remove_class("-active-setting") + + def _highlight_extra(self, on: bool) -> None: + if not self.is_mounted: + return + btn_on = self.query_one("#cfg-extra-on", Button) + btn_off = self.query_one("#cfg-extra-off", Button) + if on: + btn_on.add_class("-active-setting") + btn_off.remove_class("-active-setting") + else: + btn_off.add_class("-active-setting") + btn_on.remove_class("-active-setting") + + def _highlight_port(self, port: int) -> None: + if not self.is_mounted: + return + for p in range(1, 5): + btn = self.query_one(f"#cfg-port-{p}", Button) + if p == port: + btn.add_class("-active-setting") + else: + btn.remove_class("-active-setting") diff --git a/tui/src/skywalker_tui/screens/device.py b/tui/src/skywalker_tui/screens/device.py new file mode 100644 index 0000000..94ca261 --- /dev/null +++ b/tui/src/skywalker_tui/screens/device.py @@ -0,0 +1,1236 @@ +"""Device screen -- firmware management, EEPROM operations, and diagnostics. + +This is the most complex screen in the TUI. It provides three tabs: + + Firmware: FX2 RAM read/write, CPU halt/start + EEPROM: Read, backup, and flash (safety-critical!) C2 firmware images + Diagnostics: Boot stage tests, I2C bus scan, BCM4500 register dump + +The EEPROM flash operation follows a strict state machine to prevent +accidental writes to the boot EEPROM, which would brick the device: + + idle -> file_selected -> validated -> backup -> countdown -> writing -> verifying -> done + |-> invalid (red) |-> error (persistent) + +A corrupted EEPROM means no USB enumeration until an external programmer +is used. Every safety gate is mandatory and cannot be bypassed via the UI. +""" + +import os +import time +from datetime import datetime + +from textual.app import ComposeResult +from textual.containers import Container, Horizontal, Vertical +from textual.widgets import ( + Label, Input, Button, Static, Select, ProgressBar, +) +from textual import work +from textual.worker import Worker + +from eeprom_write import parse_c2_header, parse_records + +from skywalker_tui.widgets.hex_view import HexView +from skywalker_tui.widgets.countdown_timer import CountdownTimer +from skywalker_tui.widgets.config_bits import ConfigBitsDisplay + + +# EEPROM constants (must match eeprom_write.py and skywalker_lib.py) +EEPROM_PAGE_SIZE = 16 +EEPROM_WRITE_CYCLE_MS = 10 +MAX_EEPROM_SIZE = 16384 +VENDOR_ID = 0x09C0 +PRODUCT_ID = 0x0203 + + +class DeviceScreen(Container): + """Firmware management, EEPROM flash, and hardware diagnostics.""" + + DEFAULT_CSS = """ + DeviceScreen { + layout: vertical; + } + + /* --- Identity panel (always visible) --- */ + DeviceScreen #dev-identity { + height: auto; + padding: 1 2; + background: #0e1420; + border-bottom: solid #1a2a3a; + } + DeviceScreen #dev-identity-title { + color: #00d4aa; + text-style: bold; + margin: 0 0 1 0; + } + DeviceScreen #dev-identity-info { + color: #c8d0d8; + } + DeviceScreen #dev-demo-badge { + color: #e8a020; + text-style: bold; + margin: 0 0 0 0; + } + + /* --- Tab row --- */ + DeviceScreen #dev-tab-row { + height: auto; + padding: 0 2; + background: #0e1018; + layout: horizontal; + } + DeviceScreen .dev-tab-btn { + margin: 0 1 0 0; + min-width: 16; + } + DeviceScreen .dev-tab-btn.-active { + background: #0a2a3a; + color: #00d4aa; + border: round #00d4aa; + text-style: bold; + } + + /* --- Tab content panels --- */ + DeviceScreen #dev-tab-content { + height: 1fr; + } + DeviceScreen .dev-tab-panel { + height: 1fr; + layout: vertical; + padding: 1 2; + } + + /* --- Firmware tab --- */ + DeviceScreen #dev-fw-controls { + height: auto; + layout: horizontal; + margin: 0 0 1 0; + } + DeviceScreen #dev-fw-controls Label { + width: auto; + margin: 1 1 0 0; + color: #506878; + } + DeviceScreen #dev-fw-controls Input { + width: 12; + margin: 0 1; + } + DeviceScreen #dev-fw-controls Button { + margin: 0 1; + } + DeviceScreen #dev-fw-cpu-controls { + height: auto; + layout: horizontal; + margin: 0 0 1 0; + } + DeviceScreen #dev-fw-cpu-controls Button { + margin: 0 1; + } + DeviceScreen #dev-fw-status { + height: auto; + color: #506878; + margin: 0 0 1 0; + } + + /* --- EEPROM tab --- */ + DeviceScreen #dev-ee-read-row { + height: auto; + layout: horizontal; + margin: 0 0 1 0; + } + DeviceScreen #dev-ee-read-row Button { + margin: 0 1; + } + DeviceScreen #dev-ee-flash-section { + height: auto; + background: #0e1018; + border: round #1a2a3a; + padding: 1; + margin: 0 0 1 0; + } + DeviceScreen #dev-ee-flash-title { + color: #e8a020; + text-style: bold; + margin: 0 0 1 0; + } + DeviceScreen #dev-ee-flash-controls { + height: auto; + layout: horizontal; + margin: 0 0 1 0; + } + DeviceScreen #dev-ee-flash-controls Label { + width: auto; + margin: 1 1 0 0; + color: #506878; + } + DeviceScreen #dev-ee-flash-controls Input { + width: 1fr; + margin: 0 1; + } + DeviceScreen #dev-ee-flash-controls Button { + margin: 0 1; + } + DeviceScreen #dev-ee-validation { + height: auto; + margin: 0 0 1 0; + } + DeviceScreen #dev-ee-progress-row { + height: auto; + layout: horizontal; + margin: 0 0 1 0; + } + DeviceScreen #dev-ee-progress-label { + width: auto; + margin: 0 1 0 0; + color: #506878; + } + DeviceScreen #dev-ee-pbar { + width: 1fr; + } + DeviceScreen #dev-ee-status { + height: auto; + color: #506878; + margin: 0 0 1 0; + } + DeviceScreen #dev-ee-persistent-warn { + height: auto; + color: #e04040; + text-style: bold; + margin: 0 0 1 0; + } + + /* --- Diagnostics tab --- */ + DeviceScreen #dev-diag-boot-row { + height: auto; + layout: horizontal; + margin: 0 0 1 0; + } + DeviceScreen #dev-diag-boot-row Label { + width: auto; + margin: 1 1 0 0; + color: #506878; + } + DeviceScreen #dev-diag-boot-row Button { + margin: 0 1; + } + DeviceScreen #dev-diag-boot-result { + height: auto; + color: #c8d0d8; + margin: 0 0 1 0; + } + DeviceScreen #dev-diag-i2c-row { + height: auto; + layout: horizontal; + margin: 0 0 1 0; + } + DeviceScreen #dev-diag-i2c-row Button { + margin: 0 1; + } + DeviceScreen #dev-diag-i2c-result { + height: auto; + color: #c8d0d8; + margin: 0 0 1 0; + } + DeviceScreen #dev-diag-reg-row { + height: auto; + layout: horizontal; + margin: 0 0 1 0; + } + DeviceScreen #dev-diag-reg-row Label { + width: auto; + margin: 1 1 0 0; + color: #506878; + } + DeviceScreen #dev-diag-reg-row Input { + width: 10; + margin: 0 1; + } + DeviceScreen #dev-diag-reg-row Button { + margin: 0 1; + } + """ + + # Boot diagnostic mode values + _BOOT_MODES = [ + ("0x80 No-op", 0x80), + ("0x81 GPIO init", 0x81), + ("0x82 I2C probe", 0x82), + ("0x83 BCM4500 reset", 0x83), + ("0x84 FW load", 0x84), + ("0x85 Full boot", 0x85), + ] + + def __init__(self, bridge, **kwargs): + super().__init__(**kwargs) + self._bridge = bridge + self._active_tab = "firmware" + self._workers: list[Worker] = [] + + # EEPROM flash state machine + self._flash_state = "idle" + self._flash_image: bytes = b'' + self._flash_header: dict | None = None + self._flash_records: list | None = None + self._eeprom_backup: bytes = b'' + self._verify_failed = False + + def compose(self) -> ComposeResult: + # Identity panel (always visible) + with Vertical(id="dev-identity"): + yield Static("[#00d4aa bold]Device Identity[/]", id="dev-identity-title") + if self._bridge.is_demo: + yield Static("[#e8a020 bold]DEMO MODE \u2014 no hardware writes[/]", + id="dev-demo-badge") + yield Static("[#506878]Loading...[/]", id="dev-identity-info") + yield ConfigBitsDisplay(id="dev-config-bits") + + # Tab buttons + with Horizontal(id="dev-tab-row"): + yield Button("Firmware", id="dev-tab-firmware", + classes="dev-tab-btn -active") + yield Button("EEPROM", id="dev-tab-eeprom", classes="dev-tab-btn") + yield Button("Diagnostics", id="dev-tab-diag", classes="dev-tab-btn") + + # Tab content area -- we toggle visibility manually + with Vertical(id="dev-tab-content"): + # === Firmware tab === + with Vertical(id="dev-panel-firmware", classes="dev-tab-panel"): + with Horizontal(id="dev-fw-controls"): + yield Label("Address (hex):") + yield Input("0x0000", id="dev-fw-addr") + yield Label("Length:") + yield Input("64", id="dev-fw-len") + yield Button("Read RAM", id="dev-fw-read", variant="primary") + with Horizontal(id="dev-fw-cpu-controls"): + yield Button("Halt CPU", id="dev-fw-halt", variant="error") + yield Button("Start CPU", id="dev-fw-start", variant="success") + yield Static("[#506878]Ready[/]", id="dev-fw-status") + yield HexView(id="dev-fw-hex") + + # === EEPROM tab === + with Vertical(id="dev-panel-eeprom", classes="dev-tab-panel"): + with Horizontal(id="dev-ee-read-row"): + yield Button("Read All", id="dev-ee-read", variant="primary") + yield Button("Backup to File", id="dev-ee-backup") + yield Static("", id="dev-ee-persistent-warn") + + # Flash section + with Vertical(id="dev-ee-flash-section"): + yield Static( + "[#e8a020 bold]Flash Firmware Image[/]", + id="dev-ee-flash-title", + ) + with Horizontal(id="dev-ee-flash-controls"): + yield Label("Image file:") + yield Input("", id="dev-ee-filepath", + placeholder="/path/to/firmware.bin") + yield Button("Validate", id="dev-ee-validate") + yield Button("Flash", id="dev-ee-flash", disabled=True, + variant="error") + yield Static("", id="dev-ee-validation") + + with Horizontal(id="dev-ee-progress-row"): + yield Static("[#506878]Idle[/]", id="dev-ee-progress-label") + yield ProgressBar(total=100, show_eta=False, id="dev-ee-pbar") + yield Static("[#506878]Ready[/]", id="dev-ee-status") + yield HexView(id="dev-ee-hex") + + # === Diagnostics tab === + with Vertical(id="dev-panel-diag", classes="dev-tab-panel"): + # Boot test + yield Static("[#00d4aa bold]Boot Diagnostic[/]") + with Horizontal(id="dev-diag-boot-row"): + yield Label("Mode:") + yield Select( + [(label, val) for label, val in self._BOOT_MODES], + value=0x80, + id="dev-diag-boot-mode", + ) + yield Button("Run", id="dev-diag-boot-run", variant="primary") + yield Static("", id="dev-diag-boot-result") + + # I2C scan + yield Static("[#00d4aa bold]I2C Bus Scan[/]") + with Horizontal(id="dev-diag-i2c-row"): + yield Button("Scan", id="dev-diag-i2c-scan", variant="primary") + yield Static("", id="dev-diag-i2c-result") + + # BCM4500 register dump + yield Static("[#00d4aa bold]BCM4500 Register Dump[/]") + with Horizontal(id="dev-diag-reg-row"): + yield Label("Start reg:") + yield Input("0x00", id="dev-diag-reg-start") + yield Label("Count:") + yield Input("32", id="dev-diag-reg-count") + yield Button("Read", id="dev-diag-reg-read", variant="primary") + yield HexView(id="dev-diag-hex") + + # --- Lifecycle --- + + def on_show(self) -> None: + """Read device identity when the screen becomes visible.""" + self._read_identity() + # Hide inactive tabs + self._apply_tab_visibility() + + def on_hide(self) -> None: + """Cancel all running workers when leaving the screen.""" + for w in self._workers: + try: + w.cancel() + except Exception: + pass + self._workers.clear() + + # --- Tab switching --- + + def _switch_tab(self, tab: str) -> None: + """Switch between firmware / eeprom / diag tabs.""" + self._active_tab = tab + # Update button highlights + for name in ("firmware", "eeprom", "diag"): + btn = self.query_one(f"#dev-tab-{name}", Button) + if name == tab: + btn.add_class("-active") + else: + btn.remove_class("-active") + self._apply_tab_visibility() + + def _apply_tab_visibility(self) -> None: + """Show/hide tab panels based on active tab.""" + tab_map = { + "firmware": "dev-panel-firmware", + "eeprom": "dev-panel-eeprom", + "diag": "dev-panel-diag", + } + for name, panel_id in tab_map.items(): + try: + panel = self.query_one(f"#{panel_id}") + panel.display = (name == self._active_tab) + except Exception: + pass + + # --- Button dispatch --- + + def on_button_pressed(self, event: Button.Pressed) -> None: + btn_id = event.button.id or "" + + # Tab buttons + if btn_id == "dev-tab-firmware": + self._switch_tab("firmware") + elif btn_id == "dev-tab-eeprom": + self._switch_tab("eeprom") + elif btn_id == "dev-tab-diag": + self._switch_tab("diag") + + # Firmware tab + elif btn_id == "dev-fw-read": + self._fw_ram_read() + elif btn_id == "dev-fw-halt": + self._fw_cpu_halt() + elif btn_id == "dev-fw-start": + self._fw_cpu_start() + + # EEPROM tab + elif btn_id == "dev-ee-read": + self._ee_read_all() + elif btn_id == "dev-ee-backup": + self._ee_backup() + elif btn_id == "dev-ee-validate": + self._ee_validate_file() + elif btn_id == "dev-ee-flash": + self._ee_begin_flash() + + # Diagnostics tab + elif btn_id == "dev-diag-boot-run": + self._diag_boot_test() + elif btn_id == "dev-diag-i2c-scan": + self._diag_i2c_scan() + elif btn_id == "dev-diag-reg-read": + self._diag_reg_read() + + # --- Countdown timer messages --- + + def on_countdown_timer_completed(self, _msg: CountdownTimer.Completed) -> None: + """Countdown finished -- proceed with EEPROM write.""" + self._remove_countdown() + self._ee_do_write() + + def on_countdown_timer_aborted(self, _msg: CountdownTimer.Aborted) -> None: + """Operator aborted the countdown.""" + self._remove_countdown() + self._flash_state = "idle" + self._set_ee_status("[#e8a020]Flash aborted by operator[/]") + self.query_one("#dev-ee-flash", Button).disabled = False + + def _remove_countdown(self) -> None: + """Remove the countdown widget from the flash section.""" + try: + timer = self.query_one(CountdownTimer) + timer.remove() + except Exception: + pass + + # --- Identity panel --- + + @work(thread=True) + def _read_identity(self) -> None: + """Read device info in a background thread and update the identity panel.""" + try: + fw = self._bridge.get_fw_version() + serial = self._bridge.get_serial_number() + speed = self._bridge.get_usb_speed() + config = self._bridge.get_config() + vendor = self._bridge.get_vendor_string() + product = self._bridge.get_product_string() + except Exception as e: + self.app.call_from_thread( + self._update_identity_error, str(e) + ) + return + + speed_str = {0: "Unknown", 1: "Full (12 Mbps)", 2: "High (480 Mbps)"}.get( + speed, f"0x{speed:02X}" + ) + serial_hex = serial.hex(' ') if isinstance(serial, (bytes, bytearray)) else str(serial) + fw_str = fw.get("version", "?") + fw_date = fw.get("date", "?") + + info_lines = [ + f"[#506878]Firmware:[/] [#c8d0d8]{fw_str}[/] [#506878]({fw_date})[/]", + f"[#506878]Serial:[/] [#c8d0d8]{serial_hex}[/]", + f"[#506878]USB:[/] [#c8d0d8]{speed_str}[/]", + f"[#506878]Vendor:[/] [#c8d0d8]{vendor}[/]", + f"[#506878]Product:[/] [#c8d0d8]{product}[/]", + ] + self.app.call_from_thread(self._update_identity, "\n".join(info_lines), config) + + def _update_identity(self, info_text: str, config: int) -> None: + if not self.is_mounted: + return + self.query_one("#dev-identity-info", Static).update(info_text) + self.query_one("#dev-config-bits", ConfigBitsDisplay).update_config(config) + + def _update_identity_error(self, error: str) -> None: + if not self.is_mounted: + return + self.query_one("#dev-identity-info", Static).update( + f"[bold #e04040]Error reading device info:[/] [#e04040]{error}[/]" + ) + + # ========================================================================== + # Firmware tab operations + # ========================================================================== + + @work(thread=True) + def _fw_ram_read(self) -> None: + """Read FX2 RAM region and display in hex view.""" + try: + addr_str = self.app.call_from_thread( + lambda: self.query_one("#dev-fw-addr", Input).value + ) + len_str = self.app.call_from_thread( + lambda: self.query_one("#dev-fw-len", Input).value + ) + addr = int(addr_str, 0) + length = int(len_str, 0) + length = max(1, min(length, 4096)) + except (ValueError, TypeError): + self.app.call_from_thread( + self._set_fw_status, "[bold #e04040]Invalid address or length[/]" + ) + return + + self.app.call_from_thread( + self._set_fw_status, f"[#506878]Reading 0x{addr:04X}...[/]" + ) + + try: + # Read in chunks of 64 bytes (USB control transfer limit) + data = bytearray() + remaining = length + offset = addr + while remaining > 0: + chunk_size = min(64, remaining) + chunk = self._bridge.fx2_ram_read(offset, chunk_size) + data.extend(chunk) + offset += chunk_size + remaining -= chunk_size + except Exception as e: + self.app.call_from_thread( + self._set_fw_status, + f"[bold #e04040]RAM read failed:[/] [#e04040]{e}[/]", + ) + return + + self.app.call_from_thread(self._show_fw_data, bytes(data), addr, length) + + def _show_fw_data(self, data: bytes, addr: int, length: int) -> None: + if not self.is_mounted: + return + self.query_one("#dev-fw-hex", HexView).set_data(data) + self._set_fw_status( + f"[#00d4aa]Read {len(data)} bytes from 0x{addr:04X}[/]" + ) + + @work(thread=True) + def _fw_cpu_halt(self) -> None: + self.app.call_from_thread( + self._set_fw_status, "[#e8a020]Halting FX2 CPU...[/]" + ) + try: + self._bridge.fx2_cpu_halt() + self.app.call_from_thread( + self._set_fw_status, "[#e04040]CPU halted (CPUCS=0x01)[/]" + ) + except Exception as e: + self.app.call_from_thread( + self._set_fw_status, + f"[bold #e04040]Halt failed:[/] [#e04040]{e}[/]", + ) + + @work(thread=True) + def _fw_cpu_start(self) -> None: + self.app.call_from_thread( + self._set_fw_status, "[#e8a020]Starting FX2 CPU...[/]" + ) + try: + self._bridge.fx2_cpu_start() + self.app.call_from_thread( + self._set_fw_status, "[#00e060]CPU started (CPUCS=0x00)[/]" + ) + except Exception as e: + self.app.call_from_thread( + self._set_fw_status, + f"[bold #e04040]Start failed:[/] [#e04040]{e}[/]", + ) + + def _set_fw_status(self, text: str) -> None: + if not self.is_mounted: + return + self.query_one("#dev-fw-status", Static).update(text) + + # ========================================================================== + # EEPROM tab operations + # ========================================================================== + + @work(thread=True) + def _ee_read_all(self) -> None: + """Read entire EEPROM and display in hex view.""" + self.app.call_from_thread( + self._set_ee_status, "[#506878]Reading EEPROM (16 KB)...[/]" + ) + self.app.call_from_thread(self._set_ee_progress, "Reading...", 0) + + try: + data = bytearray() + chunk_size = 64 + total = MAX_EEPROM_SIZE + for offset in range(0, total, chunk_size): + remaining = min(chunk_size, total - offset) + chunk = self._bridge.eeprom_read(offset, remaining) + data.extend(chunk) + pct = (offset + remaining) / total * 100 + self.app.call_from_thread(self._set_ee_progress, "Reading...", pct) + except Exception as e: + self.app.call_from_thread( + self._set_ee_status, + f"[bold #e04040]EEPROM read failed:[/] [#e04040]{e}[/]", + ) + return + + self.app.call_from_thread(self._show_ee_data, bytes(data)) + + def _show_ee_data(self, data: bytes) -> None: + if not self.is_mounted: + return + self.query_one("#dev-ee-hex", HexView).set_data(data) + self._set_ee_progress("Complete", 100) + # Parse and display C2 header info + header = parse_c2_header(data) + if header: + self._set_ee_status( + f"[#00d4aa]Read {len(data)} bytes[/] \u2014 " + f"C2 VID=0x{header['vid']:04X} PID=0x{header['pid']:04X} " + f"Config=0x{header['config']:02X}" + ) + else: + self._set_ee_status( + f"[#e8a020]Read {len(data)} bytes \u2014 not a valid C2 image[/]" + ) + + @work(thread=True) + def _ee_backup(self) -> None: + """Read EEPROM and save to timestamped .bin file.""" + ts = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"skywalker1_eeprom_{ts}.bin" + + self.app.call_from_thread( + self._set_ee_status, f"[#506878]Backing up to {filename}...[/]" + ) + self.app.call_from_thread(self._set_ee_progress, "Backup...", 0) + + try: + data = bytearray() + chunk_size = 64 + total = MAX_EEPROM_SIZE + for offset in range(0, total, chunk_size): + remaining = min(chunk_size, total - offset) + chunk = self._bridge.eeprom_read(offset, remaining) + data.extend(chunk) + pct = (offset + remaining) / total * 100 + self.app.call_from_thread(self._set_ee_progress, "Backup...", pct) + + with open(filename, 'wb') as f: + f.write(data) + except Exception as e: + self.app.call_from_thread( + self._set_ee_status, + f"[bold #e04040]Backup failed:[/] [#e04040]{e}[/]", + ) + return + + abs_path = os.path.abspath(filename) + self.app.call_from_thread( + self._set_ee_status, + f"[#00d4aa]Backup saved:[/] [#c8d0d8]{abs_path}[/] " + f"[#506878]({len(data)} bytes)[/]", + ) + self.app.call_from_thread(self._set_ee_progress, "Complete", 100) + + # --- C2 image validation --- + + def _ee_validate_file(self) -> None: + """Validate the selected C2 firmware image file.""" + # Block re-entry during active flash operations + if self._flash_state in ("backup", "countdown", "writing", "verifying"): + return + filepath = self.query_one("#dev-ee-filepath", Input).value.strip() + validation = self.query_one("#dev-ee-validation", Static) + flash_btn = self.query_one("#dev-ee-flash", Button) + + if not filepath: + validation.update("[bold #e04040]No file path entered[/]") + flash_btn.disabled = True + self._flash_state = "idle" + return + + if not os.path.exists(filepath): + validation.update(f"[bold #e04040]File not found:[/] [#e04040]{filepath}[/]") + flash_btn.disabled = True + self._flash_state = "idle" + return + + try: + with open(filepath, 'rb') as f: + image = f.read() + except OSError as e: + validation.update(f"[bold #e04040]Cannot read file:[/] [#e04040]{e}[/]") + flash_btn.disabled = True + self._flash_state = "idle" + return + + self._flash_state = "file_selected" + + # Run all C2 validation checks + errors = [] + warnings = [] + + # Size checks + if len(image) > MAX_EEPROM_SIZE: + errors.append( + f"Image too large: {len(image)} bytes (max {MAX_EEPROM_SIZE})" + ) + if len(image) < 12: + errors.append(f"Image too small: {len(image)} bytes (need >= 12)") + + # Magic byte + if not image or image[0] != 0xC2: + errors.append( + f"Not a C2 image (first byte: 0x{image[0]:02X}, expected 0xC2)" + if image else "Empty file" + ) + + if errors: + self._flash_state = "invalid" + flash_btn.disabled = True + err_text = "\n".join(f"[bold #e04040]\u2717 {e}[/]" for e in errors) + validation.update(err_text) + return + + # Parse C2 header + header = parse_c2_header(image) + if header is None: + self._flash_state = "invalid" + flash_btn.disabled = True + validation.update("[bold #e04040]\u2717 Failed to parse C2 header[/]") + return + + # VID/PID match + if header["vid"] != VENDOR_ID: + errors.append( + f"VID mismatch: image 0x{header['vid']:04X}, " + f"expected 0x{VENDOR_ID:04X}" + ) + if header["pid"] != PRODUCT_ID: + errors.append( + f"PID mismatch: image 0x{header['pid']:04X}, " + f"expected 0x{PRODUCT_ID:04X}" + ) + + # Parse records and check for END marker + records = parse_records(image) + end_recs = [r for r in records if r["type"] == "end"] + invalid_recs = [r for r in records if r["type"] == "invalid"] + + if not end_recs: + errors.append("No END marker (0x8001) found in image") + if invalid_recs: + warnings.append(f"{len(invalid_recs)} invalid record(s) in image") + + if errors: + self._flash_state = "invalid" + flash_btn.disabled = True + lines = [f"[bold #e04040]\u2717 {e}[/]" for e in errors] + lines += [f"[#e8a020]\u26a0 {w}[/]" for w in warnings] + validation.update("\n".join(lines)) + return + + # All checks passed + self._flash_state = "validated" + self._flash_image = image + self._flash_header = header + self._flash_records = records + + data_recs = [r for r in records if r["type"] == "data"] + total_code = sum(r["length"] for r in data_recs) + entry = end_recs[0]["entry_point"] if end_recs else 0 + + lines = [ + "[bold #00e060]\u2713 Valid C2 image[/]", + f"[#506878]Size:[/] [#c8d0d8]{len(image)} bytes[/]", + f"[#506878]VID:[/] [#c8d0d8]0x{header['vid']:04X}[/] " + f"[#506878]PID:[/] [#c8d0d8]0x{header['pid']:04X}[/]", + f"[#506878]Code:[/] [#c8d0d8]{total_code} bytes " + f"in {len(data_recs)} segment(s)[/]", + f"[#506878]Entry:[/] [#c8d0d8]0x{entry:04X}[/]", + ] + if warnings: + lines += [f"[#e8a020]\u26a0 {w}[/]" for w in warnings] + + validation.update("\n".join(lines)) + flash_btn.disabled = False + + # --- Flash state machine --- + + def _ee_begin_flash(self) -> None: + """Start the flash sequence: backup -> countdown -> write -> verify.""" + if self._flash_state != "validated": + return + + # In demo mode, show the full flow but skip actual writes + if self._bridge.is_demo: + self._set_ee_status( + "[#e8a020 bold]DEMO MODE \u2014 simulating flash " + "(no hardware writes)[/]" + ) + + self.query_one("#dev-ee-flash", Button).disabled = True + self._flash_state = "backup" + self._ee_flash_backup() + + @work(thread=True) + def _ee_flash_backup(self) -> None: + """Pre-flash backup of current EEPROM contents.""" + self.app.call_from_thread( + self._set_ee_status, "[#506878]Pre-flash backup...[/]" + ) + self.app.call_from_thread(self._set_ee_progress, "Backup...", 0) + + try: + data = bytearray() + chunk_size = 64 + total = MAX_EEPROM_SIZE + for offset in range(0, total, chunk_size): + remaining = min(chunk_size, total - offset) + chunk = self._bridge.eeprom_read(offset, remaining) + data.extend(chunk) + pct = (offset + remaining) / total * 100 + self.app.call_from_thread(self._set_ee_progress, "Backup...", pct) + + self._eeprom_backup = bytes(data) + + # Save backup to file + ts = datetime.now().strftime("%Y%m%d_%H%M%S") + backup_file = f"eeprom_preflash_{ts}.bin" + with open(backup_file, 'wb') as f: + f.write(data) + + abs_path = os.path.abspath(backup_file) + self.app.call_from_thread( + self._set_ee_status, + f"[#00d4aa]Pre-flash backup saved:[/] [#c8d0d8]{abs_path}[/]", + ) + except Exception as e: + self.app.call_from_thread( + self._set_ee_status, + f"[bold #e04040]Backup failed \u2014 flash aborted:[/] " + f"[#e04040]{e}[/]", + ) + self._flash_state = "idle" + self.app.call_from_thread(self._enable_flash_button) + return + + self._flash_state = "countdown" + self.app.call_from_thread(self._show_countdown) + + def _show_countdown(self) -> None: + """Mount the countdown timer widget into the flash section.""" + if not self.is_mounted: + return + flash_section = self.query_one("#dev-ee-flash-section") + timer = CountdownTimer(id="dev-ee-countdown") + flash_section.mount(timer) + timer.start() + + def _ee_do_write(self) -> None: + """Called when countdown completes -- start the actual write.""" + self._flash_state = "writing" + w = self._ee_write_and_verify() + self._workers.append(w) + + @work(thread=True) + def _ee_write_and_verify(self) -> None: + """Write the firmware image to EEPROM, then verify.""" + image = self._flash_image + total_pages = (len(image) + EEPROM_PAGE_SIZE - 1) // EEPROM_PAGE_SIZE + write_errors = 0 + consecutive_errors = 0 + write_aborted = False + max_consecutive_errors = 3 + + self.app.call_from_thread( + self._set_ee_status, + f"[#e8a020 bold]Writing {len(image)} bytes " + f"({total_pages} pages)...[/]", + ) + + for page_num in range(total_pages): + offset = page_num * EEPROM_PAGE_SIZE + end = min(offset + EEPROM_PAGE_SIZE, len(image)) + chunk = image[offset:end] + pct = (page_num + 1) / total_pages * 100 + + self.app.call_from_thread( + self._set_ee_progress, + f"Writing 0x{offset:04X}...", pct, + ) + + try: + written = self._bridge.eeprom_write_page(offset, chunk) + if written != len(chunk): + write_errors += 1 + consecutive_errors += 1 + else: + consecutive_errors = 0 + except Exception: + write_errors += 1 + consecutive_errors += 1 + + if consecutive_errors >= max_consecutive_errors: + write_aborted = True + self.app.call_from_thread( + self._set_ee_status, + f"[bold #e04040]WRITE ABORTED after " + f"{consecutive_errors} consecutive errors " + f"at 0x{offset:04X}[/]", + ) + break + + # Wait for EEPROM internal write cycle + time.sleep(EEPROM_WRITE_CYCLE_MS / 1000.0) + + if write_aborted: + self._flash_state = "error" + self._verify_failed = True + self.app.call_from_thread( + self._show_verify_error, + f"Write aborted at page {page_num}/{total_pages} " + f"({write_errors} total errors)", + set(), + ) + return + + if write_errors: + self.app.call_from_thread( + self._set_ee_status, + f"[bold #e04040]Write completed with {write_errors} error(s)[/]", + ) + + # --- Verify phase --- + self._flash_state = "verifying" + self.app.call_from_thread( + self._set_ee_status, + f"[#506878]Verifying {len(image)} bytes...[/]", + ) + + try: + verify_data = bytearray() + chunk_size = 64 + for offset in range(0, len(image), chunk_size): + remaining = min(chunk_size, len(image) - offset) + chunk = self._bridge.eeprom_read(offset, remaining) + verify_data.extend(chunk) + pct = (offset + remaining) / len(image) * 100 + self.app.call_from_thread( + self._set_ee_progress, "Verifying...", pct, + ) + except Exception as e: + self._flash_state = "error" + self._verify_failed = True + self.app.call_from_thread( + self._show_verify_error, + f"Verify read failed: {e}", + set(), + ) + return + + # Check total length before comparing + if len(verify_data) != len(image): + self._flash_state = "error" + self._verify_failed = True + self.app.call_from_thread( + self._show_verify_error, + f"Read-back size mismatch: expected {len(image)} bytes, " + f"got {len(verify_data)}", + set(), + ) + return + + # Byte-by-byte comparison + mismatches: set[int] = set() + for i in range(len(image)): + if image[i] != verify_data[i]: + mismatches.add(i) + + if not mismatches: + self._flash_state = "done" + self._verify_failed = False + self.app.call_from_thread(self._show_verify_success, bytes(verify_data)) + else: + self._flash_state = "error" + self._verify_failed = True + self.app.call_from_thread( + self._show_verify_error, + f"{len(mismatches)} byte(s) differ", + mismatches, + bytes(verify_data), + ) + + def _show_verify_success(self, data: bytes) -> None: + """Flash + verify succeeded.""" + if not self.is_mounted: + return + self.query_one("#dev-ee-hex", HexView).set_data(data) + self._set_ee_progress("Complete", 100) + self._set_ee_status( + f"[bold #00e060]\u2713 VERIFIED \u2014 all {len(data)} bytes match[/]\n" + f"[#506878]Power cycle the device to boot new firmware.[/]" + ) + self.query_one("#dev-ee-persistent-warn", Static).update("") + self.query_one("#dev-ee-flash", Button).disabled = False + + def _show_verify_error(self, detail: str, mismatches: set[int], + verify_data: bytes | None = None) -> None: + """Flash verify FAILED -- show persistent warning.""" + if not self.is_mounted: + return + if verify_data: + self.query_one("#dev-ee-hex", HexView).set_data( + verify_data, diff_offsets=mismatches, + ) + self._set_ee_progress("FAILED", 100) + self._set_ee_status( + f"[bold #e04040]\u2717 VERIFY FAILED \u2014 {detail}[/]" + ) + # Persistent, impossible-to-miss warning + self.query_one("#dev-ee-persistent-warn", Static).update( + "[bold #e04040 on #1a0000]" + "\u26a0\u26a0\u26a0 DO NOT POWER CYCLE \u26a0\u26a0\u26a0\n" + "EEPROM contents do not match the image.\n" + "Power cycling now may brick the device.\n" + "Resolve this before removing power." + "[/]" + ) + self.query_one("#dev-ee-flash", Button).disabled = False + + def _enable_flash_button(self) -> None: + if not self.is_mounted: + return + self.query_one("#dev-ee-flash", Button).disabled = False + + # --- EEPROM UI helpers --- + + def _set_ee_status(self, text: str) -> None: + if not self.is_mounted: + return + self.query_one("#dev-ee-status", Static).update(text) + + def _set_ee_progress(self, label: str, pct: float) -> None: + if not self.is_mounted: + return + self.query_one("#dev-ee-progress-label", Static).update( + f"[#506878]{label}[/]" + ) + self.query_one("#dev-ee-pbar", ProgressBar).update(progress=pct) + + # ========================================================================== + # Diagnostics tab operations + # ========================================================================== + + @work(thread=True) + def _diag_boot_test(self) -> None: + """Run boot diagnostic with selected mode.""" + try: + mode = self.app.call_from_thread( + lambda: self.query_one("#dev-diag-boot-mode", Select).value + ) + if mode is Select.BLANK: + mode = 0x80 + except Exception: + mode = 0x80 + + self.app.call_from_thread( + self._set_diag_boot_result, + f"[#506878]Running boot test mode 0x{mode:02X}...[/]", + ) + + try: + result = self._bridge.boot_debug(mode) + except Exception as e: + self.app.call_from_thread( + self._set_diag_boot_result, + f"[bold #e04040]Boot test failed:[/] [#e04040]{e}[/]", + ) + return + + stage = result.get("stage", 0) + res = result.get("result", 0) + detail = result.get("detail", 0) + + if res == 0x01: + status_str = "[bold #00e060]PASS[/]" + elif res == 0x00: + status_str = "[#e8a020]NO RESULT[/]" + else: + status_str = f"[bold #e04040]FAIL (0x{res:02X})[/]" + + self.app.call_from_thread( + self._set_diag_boot_result, + f"Stage: 0x{stage:02X} Result: {status_str} " + f"Detail: 0x{detail:02X}", + ) + + def _set_diag_boot_result(self, text: str) -> None: + if not self.is_mounted: + return + self.query_one("#dev-diag-boot-result", Static).update(text) + + @work(thread=True) + def _diag_i2c_scan(self) -> None: + """Scan I2C bus for responding devices.""" + self.app.call_from_thread( + self._set_diag_i2c_result, + "[#506878]Scanning I2C bus...[/]", + ) + + try: + addresses = self._bridge.i2c_bus_scan() + except Exception as e: + self.app.call_from_thread( + self._set_diag_i2c_result, + f"[bold #e04040]I2C scan failed:[/] [#e04040]{e}[/]", + ) + return + + if not addresses: + self.app.call_from_thread( + self._set_diag_i2c_result, + "[#e8a020]No devices found on I2C bus[/]", + ) + return + + # Known device identification + known = { + 0x08: "BCM4500 demodulator", + 0x51: "Boot EEPROM (24Cxx)", + 0x60: "BCM3440 tuner", + 0x61: "ISL6421 LNB controller", + } + parts = [] + for addr in addresses: + label = known.get(addr, "") + if label: + parts.append( + f"[#00d4aa]0x{addr:02X}[/] [#506878]({label})[/]" + ) + else: + parts.append(f"[#00d4aa]0x{addr:02X}[/]") + + self.app.call_from_thread( + self._set_diag_i2c_result, + f"[#c8d0d8]Found {len(addresses)} device(s):[/] " + + " ".join(parts), + ) + + def _set_diag_i2c_result(self, text: str) -> None: + if not self.is_mounted: + return + self.query_one("#dev-diag-i2c-result", Static).update(text) + + @work(thread=True) + def _diag_reg_read(self) -> None: + """Read BCM4500 registers and display in hex view.""" + try: + start_str = self.app.call_from_thread( + lambda: self.query_one("#dev-diag-reg-start", Input).value + ) + count_str = self.app.call_from_thread( + lambda: self.query_one("#dev-diag-reg-count", Input).value + ) + start_reg = int(start_str, 0) + count = int(count_str, 0) + count = max(1, min(count, 256)) + except (ValueError, TypeError): + self.app.call_from_thread( + self._set_diag_i2c_result, + "[bold #e04040]Invalid register address or count[/]", + ) + return + + try: + # Read in batches of 64 (USB transfer limit) + data = bytearray() + remaining = count + reg = start_reg + while remaining > 0: + batch = min(64, remaining) + chunk = self._bridge.multi_reg_read(reg, batch) + data.extend(chunk) + reg += batch + remaining -= batch + except Exception as e: + self.app.call_from_thread( + self._set_diag_i2c_result, + f"[bold #e04040]Register read failed:[/] [#e04040]{e}[/]", + ) + return + + self.app.call_from_thread( + self._show_diag_regs, bytes(data), start_reg, count, + ) + + def _show_diag_regs(self, data: bytes, start_reg: int, count: int) -> None: + if not self.is_mounted: + return + self.query_one("#dev-diag-hex", HexView).set_data(data) diff --git a/tui/src/skywalker_tui/screens/stream.py b/tui/src/skywalker_tui/screens/stream.py new file mode 100644 index 0000000..cc3c4a7 --- /dev/null +++ b/tui/src/skywalker_tui/screens/stream.py @@ -0,0 +1,401 @@ +"""Stream screen -- live MPEG-2 transport stream capture and analysis. + +Reads raw TS data from the SkyWalker-1 bulk endpoint, parses 188-byte +packets in real time, and displays PID distribution statistics alongside +a hierarchical PSI (PAT/PMT) program structure tree. + +Supports file capture mode for saving raw .ts files to disk. + +arm_transfer(on) is always paired in try/finally to guarantee the USB +bulk endpoint is disarmed when monitoring stops. +""" + +import time + +from textual.app import ComposeResult +from textual.containers import Container, Horizontal, Vertical +from textual.widgets import Label, Input, Button, Static +from textual import work +from textual.worker import Worker + +from ts_analyze import ( + TSPacket, PSIParser, parse_pat, parse_pmt, + KNOWN_PIDS, TS_PACKET_SIZE, +) + +from skywalker_tui.widgets.pid_table import PidTable +from skywalker_tui.widgets.psi_tree import PsiTree + + +class StreamScreen(Container): + """Live MPEG-2 TS monitor with PID analysis and PSI tree.""" + + DEFAULT_CSS = """ + StreamScreen { + layout: vertical; + } + StreamScreen #stream-main { + height: 1fr; + layout: horizontal; + } + StreamScreen #stream-pid-col { + width: 3fr; + height: 1fr; + padding: 1; + } + StreamScreen #stream-psi-col { + width: 2fr; + height: 1fr; + padding: 1; + } + StreamScreen #stream-controls { + height: auto; + padding: 1 2; + background: #0e1018; + border-top: solid #1a2a3a; + layout: horizontal; + } + StreamScreen #stream-controls Label { + width: auto; + margin: 1 1 0 0; + color: #506878; + } + StreamScreen #stream-controls Input { + width: 14; + margin: 0 1; + } + StreamScreen #stream-controls Button { + margin: 0 1; + } + StreamScreen #stream-stats { + height: 3; + layout: horizontal; + padding: 0 2; + } + StreamScreen #stream-stats Static { + width: 1fr; + height: 3; + content-align: center middle; + background: #121c2a; + border: round #1a3050; + margin: 0 1 0 0; + } + """ + + def __init__(self, bridge, **kwargs): + super().__init__(**kwargs) + self._bridge = bridge + self._monitoring = False + self._capturing = False + self._capture_file = None + self._monitor_worker: Worker | None = None + + # Accumulated stats (written by worker thread, read by UI thread) + self._total_packets = 0 + self._total_bytes = 0 + self._tei_count = 0 + self._pid_counts: dict[int, int] = {} + self._cc_last: dict[int, int] = {} + self._cc_errors: dict[int, int] = {} + self._start_time = 0.0 + + def compose(self) -> ComposeResult: + with Horizontal(id="stream-main"): + with Vertical(id="stream-pid-col"): + yield Static( + "[bold #00d4aa]PID Distribution[/]", classes="panel-title" + ) + yield PidTable(id="stream-pid-table") + with Vertical(id="stream-psi-col"): + yield Static( + "[bold #00d4aa]Program Structure[/]", classes="panel-title" + ) + yield PsiTree(id="stream-psi-tree") + with Horizontal(id="stream-stats"): + yield Static("[#506878]Packets:[/] [#00d4aa]0[/]", id="stat-packets") + yield Static("[#506878]Bytes:[/] [#00d4aa]0[/]", id="stat-bytes") + yield Static("[#506878]PIDs:[/] [#00d4aa]0[/]", id="stat-pids") + yield Static("[#506878]CC Errors:[/] [#00d4aa]0[/]", id="stat-cc") + yield Static( + "[#506878]Duration:[/] [#00d4aa]0.0s[/]", id="stat-duration" + ) + with Horizontal(id="stream-controls"): + yield Button("Start Monitor", id="stream-start", variant="success") + yield Button("Stop", id="stream-stop", variant="error") + yield Label("Capture:") + yield Input("capture.ts", id="stream-capture-file") + yield Button("Capture", id="stream-capture", variant="warning") + yield Static("[#506878]Idle[/]", id="stream-status") + + def on_show(self) -> None: + """Auto-start monitoring in demo mode when this screen becomes visible.""" + if self._bridge.is_demo and not self._monitoring: + self._start_monitor() + + def on_hide(self) -> None: + """Stop monitoring when navigating away from this screen.""" + self._stop_monitor() + + def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "stream-start": + self._start_monitor() + elif event.button.id == "stream-stop": + self._stop_monitor() + elif event.button.id == "stream-capture": + self._toggle_capture() + + # -- Monitor lifecycle -- + + def _start_monitor(self) -> None: + """Begin streaming from the device bulk endpoint.""" + if self._monitoring: + return + self._monitoring = True + self._reset_stats() + self._start_time = time.monotonic() + try: + self.query_one("#stream-status", Static).update( + "[bold #00d4aa]Monitoring[/]" + ) + except Exception: + pass + self._monitor_worker = self._do_monitor() + + def _stop_monitor(self) -> None: + """Stop the monitor worker and clean up capture state.""" + self._monitoring = False + self._capturing = False + # Cancel worker BEFORE closing file — worker may still be writing + if self._monitor_worker is not None: + self._monitor_worker.cancel() + self._monitor_worker = None + if self._capture_file is not None: + self._capture_file.close() + self._capture_file = None + try: + self.query_one("#stream-status", Static).update( + "[#506878]Stopped[/]" + ) + except Exception: + pass + + def _toggle_capture(self) -> None: + """Toggle file capture on/off. Starts monitor if not already running.""" + if self._capturing: + self._capturing = False + if self._capture_file is not None: + self._capture_file.close() + self._capture_file = None + try: + self.query_one("#stream-capture", Button).label = "Capture" + except Exception: + pass + return + + # Start capture + if not self._monitoring: + self._start_monitor() + filename = self.query_one("#stream-capture-file", Input).value or "capture.ts" + try: + self._capture_file = open(filename, "wb") + self._capturing = True + self.query_one("#stream-capture", Button).label = "Stop Capture" + except OSError as e: + self.app.notify(f"Cannot open {filename}: {e}", severity="error") + + def _reset_stats(self) -> None: + """Zero all counters and clear display widgets.""" + self._total_packets = 0 + self._total_bytes = 0 + self._tei_count = 0 + self._pid_counts.clear() + self._cc_last.clear() + self._cc_errors.clear() + try: + self.query_one("#stream-pid-table", PidTable).clear_table() + self.query_one("#stream-psi-tree", PsiTree).clear_tree() + except Exception: + pass + + # -- Background worker -- + + @work(thread=True) + def _do_monitor(self) -> None: + """Background worker: read TS stream, parse packets, post UI updates. + + Runs in a dedicated thread via Textual's @work(thread=True) decorator. + Calls arm_transfer(True) on entry and arm_transfer(False) in finally + to guarantee the bulk endpoint is always disarmed on exit. + """ + psi_pat = PSIParser() + psi_pmt = PSIParser() + pat = None + pmt_pids: set[int] = set() + last_ui_update = 0.0 + + try: + self._bridge.ensure_booted() + self._bridge.arm_transfer(True) + + while self._monitoring: + data = self._bridge.read_stream(8192, timeout=1000) + if not data: + time.sleep(0.05) + continue + + self._total_bytes += len(data) + + # Write raw data to capture file if active + if self._capturing and self._capture_file is not None: + try: + self._capture_file.write(data) + except OSError: + self._capturing = False + + # Parse 188-byte TS packets from the chunk + offset = 0 + while offset + TS_PACKET_SIZE <= len(data): + if data[offset] != 0x47: + # Lost sync, scan forward for next sync byte + offset += 1 + continue + + try: + pkt = TSPacket(data[offset:offset + TS_PACKET_SIZE]) + except (ValueError, IndexError): + offset += 1 + continue + + offset += TS_PACKET_SIZE + self._total_packets += 1 + pid = pkt.pid + + # PID counting + self._pid_counts[pid] = self._pid_counts.get(pid, 0) + 1 + + # TEI (Transport Error Indicator) check + if pkt.tei: + self._tei_count += 1 + + # Continuity counter check (payload-bearing, non-null PIDs) + if pkt.adaptation & 0x01 and pid != 0x1FFF: + if pid in self._cc_last: + expected = (self._cc_last[pid] + 1) & 0x0F + if (pkt.continuity != expected + and pkt.continuity != self._cc_last[pid]): + self._cc_errors[pid] = ( + self._cc_errors.get(pid, 0) + 1 + ) + self._cc_last[pid] = pkt.continuity + + # PAT parsing (PID 0x0000) + if pid == 0x0000: + section = psi_pat.feed(pkt) + if section is not None: + parsed = parse_pat(section) + if parsed is not None: + pat = parsed + for prog_num, pmt_pid in pat.get( + "programs", {} + ).items(): + if prog_num != 0: + pmt_pids.add(pmt_pid) + + # PMT parsing (PIDs discovered from PAT) + if pid in pmt_pids: + section = psi_pmt.feed(pkt) + if section is not None: + parsed = parse_pmt(section) + if parsed is not None: + self.app.call_from_thread( + self._update_pmt, pid, parsed + ) + + # Batch UI updates every 500ms to avoid flooding the event loop + now = time.monotonic() + if now - last_ui_update >= 0.5: + last_ui_update = now + self.app.call_from_thread( + self._update_ui, + pat, + dict(self._pid_counts), + dict(self._cc_errors), + self._total_packets, + self._total_bytes, + self._tei_count, + ) + + finally: + for _attempt in range(2): + try: + self._bridge.arm_transfer(False) + break + except Exception: + time.sleep(0.1) + + # -- UI update methods (called from main thread) -- + + def _update_ui( + self, + pat: dict | None, + pid_counts: dict[int, int], + cc_errors: dict[int, int], + total_pkts: int, + total_bytes: int, + tei_count: int, + ) -> None: + """Push accumulated stats to display widgets.""" + if not self.is_mounted: + return + + # Build known PID names by merging standard table with PAT-discovered PMTs + known = dict(KNOWN_PIDS) + if pat: + for prog_num, pmt_pid in pat.get("programs", {}).items(): + if prog_num == 0: + known[pmt_pid] = "NIT" + else: + known[pmt_pid] = f"PMT (prog {prog_num})" + + self.query_one("#stream-pid-table", PidTable).update_pids( + pid_counts, cc_errors, total_pkts, known + ) + + if pat: + self.query_one("#stream-psi-tree", PsiTree).update_pat(pat) + + total_cc = sum(cc_errors.values()) + duration = time.monotonic() - self._start_time + + self.query_one("#stat-packets", Static).update( + f"[#506878]Packets:[/] [#00d4aa]{total_pkts:,}[/]" + ) + + if total_bytes >= 1_000_000: + bytes_str = f"{total_bytes / 1_000_000:.1f} MB" + elif total_bytes >= 1_000: + bytes_str = f"{total_bytes / 1_000:.1f} KB" + else: + bytes_str = str(total_bytes) + self.query_one("#stat-bytes", Static).update( + f"[#506878]Bytes:[/] [#00d4aa]{bytes_str}[/]" + ) + + self.query_one("#stat-pids", Static).update( + f"[#506878]PIDs:[/] [#00d4aa]{len(pid_counts)}[/]" + ) + + cc_color = "#e04040" if total_cc > 0 else "#00d4aa" + self.query_one("#stat-cc", Static).update( + f"[#506878]CC Errors:[/] [{cc_color}]{total_cc}[/]" + ) + + self.query_one("#stat-duration", Static).update( + f"[#506878]Duration:[/] [#00d4aa]{duration:.1f}s[/]" + ) + + def _update_pmt(self, pmt_pid: int, pmt: dict) -> None: + """Push a newly parsed PMT to the PSI tree widget.""" + if not self.is_mounted: + return + self.query_one("#stream-psi-tree", PsiTree).update_pmt(pmt_pid, pmt) diff --git a/tui/src/skywalker_tui/theme.tcss b/tui/src/skywalker_tui/theme.tcss index 10916d0..e566d66 100644 --- a/tui/src/skywalker_tui/theme.tcss +++ b/tui/src/skywalker_tui/theme.tcss @@ -339,6 +339,36 @@ SplashScreen #splash-image { content-align: center middle; } +/* ─── Hex view ─── */ + +HexView { + min-height: 6; +} + +/* ─── PID table ─── */ + +PidTable { + min-height: 8; +} + +/* ─── PSI tree ─── */ + +PsiTree { + min-height: 8; +} + +/* ─── Countdown timer ─── */ + +CountdownTimer { + margin: 1 0; +} + +/* ─── Config bits display ─── */ + +ConfigBitsDisplay { + height: auto; +} + /* ─── Star Wars overlay ─── */ StarWarsScreen { diff --git a/tui/src/skywalker_tui/widgets/config_bits.py b/tui/src/skywalker_tui/widgets/config_bits.py new file mode 100644 index 0000000..0f419a8 --- /dev/null +++ b/tui/src/skywalker_tui/widgets/config_bits.py @@ -0,0 +1,46 @@ +"""Config byte flag display with colored indicators. + +Renders the 8PSK config byte as a horizontal row of labeled flags, +each shown as a filled (set) or hollow (clear) circle with color coding. +""" + +from textual.widget import Widget +from textual.widgets import Static +from textual.app import ComposeResult + +from skywalker_lib import CONFIG_BITS + + +class ConfigBitsDisplay(Widget): + """Renders the 8PSK config byte as labeled flags with colored indicators.""" + + DEFAULT_CSS = """ + ConfigBitsDisplay { + height: auto; + padding: 0 1; + } + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self._config = 0 + + def compose(self) -> ComposeResult: + yield Static("", id="config-bits-content") + + def update_config(self, config: int) -> None: + """Update the displayed config byte value.""" + self._config = config + self._refresh() + + def _refresh(self) -> None: + if not self.is_mounted: + return + parts = [] + for bit_mask, (name, _field) in CONFIG_BITS.items(): + is_set = bool(self._config & bit_mask) + if is_set: + parts.append(f"[bold #00e060]\u25cf[/] [#c8d0d8]{name}[/]") + else: + parts.append(f"[#3a3a3a]\u25cb[/] [#506878]{name}[/]") + self.query_one("#config-bits-content", Static).update(" ".join(parts)) diff --git a/tui/src/skywalker_tui/widgets/countdown_timer.py b/tui/src/skywalker_tui/widgets/countdown_timer.py new file mode 100644 index 0000000..e601e3d --- /dev/null +++ b/tui/src/skywalker_tui/widgets/countdown_timer.py @@ -0,0 +1,112 @@ +"""Countdown timer widget with ABORT button for safety-critical operations. + +Used before EEPROM write to give the operator 3 seconds to abort. +The ABORT button is auto-focused on mount so a single Enter/Space press cancels. +""" + +import time + +from textual.widget import Widget +from textual.widgets import Button, Static, ProgressBar +from textual.app import ComposeResult +from textual.message import Message +from textual import work + + +class CountdownTimer(Widget): + """3-second countdown with prominent ABORT button. + + Posts CountdownTimer.Completed when the countdown finishes, or + CountdownTimer.Aborted if the operator presses ABORT. + """ + + class Completed(Message): + """Fired when countdown finishes without abort.""" + pass + + class Aborted(Message): + """Fired when user presses ABORT.""" + pass + + DEFAULT_CSS = """ + CountdownTimer { + height: auto; + background: #1a0a0a; + border: round #e04040; + padding: 1 2; + } + CountdownTimer #countdown-label { + text-align: center; + color: #e8a020; + text-style: bold; + margin: 0 0 1 0; + } + CountdownTimer #countdown-bar { + margin: 0 0 1 0; + } + CountdownTimer #countdown-abort { + width: 100%; + min-height: 3; + background: #e04040; + color: #ffffff; + text-style: bold; + border: round #ff6060; + } + CountdownTimer #countdown-abort:hover { + background: #ff4040; + } + CountdownTimer #countdown-abort:focus { + background: #ff2020; + border: round #ffffff; + } + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self._running = False + self._aborted = False + + def compose(self) -> ComposeResult: + yield Static("EEPROM WRITE in 3...", id="countdown-label") + yield ProgressBar( + total=30, show_eta=False, show_percentage=False, id="countdown-bar" + ) + yield Button("ABORT", id="countdown-abort", variant="error") + + def on_mount(self) -> None: + self.query_one("#countdown-abort", Button).focus() + + def start(self) -> None: + """Begin the 3-second countdown. Call after mounting.""" + self._running = True + self._aborted = False + self._do_countdown() + + def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "countdown-abort": + self._aborted = True + self._running = False + self.post_message(self.Aborted()) + + @work(thread=True) + def _do_countdown(self) -> None: + """Tick at 100ms intervals for smooth progress bar animation.""" + for tick in range(30, -1, -1): + if not self._running or self._aborted: + return + secs = tick / 10 + self.app.call_from_thread(self._update_display, secs, 30 - tick) + time.sleep(0.1) + if self._running and not self._aborted: + self.app.call_from_thread(self._fire_completed) + + def _update_display(self, secs: float, progress: int) -> None: + if not self.is_mounted: + return + label = self.query_one("#countdown-label", Static) + label.update(f"EEPROM WRITE in {secs:.1f}s \u2014 press ABORT to cancel") + self.query_one("#countdown-bar", ProgressBar).update(progress=progress) + + def _fire_completed(self) -> None: + if self.is_mounted and not self._aborted: + self.post_message(self.Completed()) diff --git a/tui/src/skywalker_tui/widgets/hex_view.py b/tui/src/skywalker_tui/widgets/hex_view.py new file mode 100644 index 0000000..1be8faf --- /dev/null +++ b/tui/src/skywalker_tui/widgets/hex_view.py @@ -0,0 +1,90 @@ +"""Scrollable hex dump widget with diff byte highlighting.""" + +from textual.widget import Widget +from textual.widgets import Static +from textual.app import ComposeResult +from textual.containers import VerticalScroll + + +class HexView(Widget): + """Displays a hex dump of binary data with optional diff highlighting. + + Set data via set_data(), optionally passing a set of byte offsets to + highlight in red (for verify mismatches). Each row shows 16 bytes in + traditional offset : hex : ASCII layout. + """ + + DEFAULT_CSS = """ + HexView { + height: 1fr; + min-height: 6; + background: #0e1420; + border: round #1a2a3a; + } + HexView #hex-scroll { + height: 1fr; + padding: 0 1; + } + HexView #hex-content { + width: auto; + } + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self._data = b'' + self._diff_offsets: set[int] = set() + + def compose(self) -> ComposeResult: + with VerticalScroll(id="hex-scroll"): + yield Static("", id="hex-content") + + def set_data(self, data: bytes, diff_offsets: set[int] | None = None) -> None: + """Set data to display. diff_offsets highlights those bytes in red.""" + self._data = data + self._diff_offsets = diff_offsets or set() + self._refresh_display() + + def clear(self) -> None: + """Clear the hex display.""" + self._data = b'' + self._diff_offsets.clear() + if self.is_mounted: + self.query_one("#hex-content", Static).update("") + + def _refresh_display(self) -> None: + if not self.is_mounted: + return + lines = [] + for row_off in range(0, len(self._data), 16): + row = self._data[row_off:row_off + 16] + # Offset column + line = f"[#506878]{row_off:04X}:[/] " + # Hex bytes + hex_parts = [] + for i, b in enumerate(row): + abs_off = row_off + i + if abs_off in self._diff_offsets: + hex_parts.append(f"[bold #e04040]{b:02X}[/]") + else: + hex_parts.append(f"[#7090a8]{b:02X}[/]") + line += " ".join(hex_parts) + # Pad if short row + if len(row) < 16: + line += " " * (16 - len(row)) + # ASCII column + line += " " + ascii_parts = [] + for i, b in enumerate(row): + abs_off = row_off + i + ch = chr(b) if 0x20 <= b < 0x7F else "." + if abs_off in self._diff_offsets: + ascii_parts.append(f"[bold #e04040]{ch}[/]") + else: + ascii_parts.append(f"[#506878]{ch}[/]") + line += "".join(ascii_parts) + lines.append(line) + + self.query_one("#hex-content", Static).update( + "\n".join(lines) if lines else "[#506878]No data[/]" + ) diff --git a/tui/src/skywalker_tui/widgets/pid_table.py b/tui/src/skywalker_tui/widgets/pid_table.py new file mode 100644 index 0000000..b500bd5 --- /dev/null +++ b/tui/src/skywalker_tui/widgets/pid_table.py @@ -0,0 +1,74 @@ +"""DataTable wrapper for MPEG-2 TS PID distribution statistics. + +Displays per-PID packet counts, percentage share, continuity counter errors, +and well-known PID names. Table is rebuilt on each update to keep the sort +order stable (by PID number ascending). +""" + +from textual.widget import Widget +from textual.widgets import DataTable +from textual.app import ComposeResult + + +class PidTable(Widget): + """Sortable PID statistics table for transport stream analysis.""" + + DEFAULT_CSS = """ + PidTable { + height: 1fr; + min-height: 8; + } + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self._pid_data: dict[int, dict] = {} + self._total_packets = 0 + + def compose(self) -> ComposeResult: + table = DataTable(id="pid-stats-table") + table.cursor_type = "row" + yield table + + def on_mount(self) -> None: + table = self.query_one("#pid-stats-table", DataTable) + for col in ["PID", "Count", "%", "CC Errors", "Name"]: + table.add_column(col, key=col) + + def update_pids( + self, + pid_counts: dict[int, int], + cc_errors: dict[int, int], + total: int, + known_pids: dict[int, str], + ) -> None: + """Rebuild the table from accumulated stats. + + Args: + pid_counts: Mapping of PID number to total packet count. + cc_errors: Mapping of PID number to continuity counter error count. + total: Total packet count across all PIDs. + known_pids: Mapping of PID number to human-readable name. + """ + self._total_packets = total + table = self.query_one("#pid-stats-table", DataTable) + table.clear() + + for pid in sorted(pid_counts.keys()): + count = pid_counts[pid] + pct = (count / total * 100) if total > 0 else 0.0 + cc_err = cc_errors.get(pid, 0) + name = known_pids.get(pid, "") + table.add_row( + f"0x{pid:04X}", + f"{count:,}", + f"{pct:.1f}%", + str(cc_err) if cc_err > 0 else "-", + name, + ) + + def clear_table(self) -> None: + """Remove all rows and reset internal state.""" + self._pid_data.clear() + self._total_packets = 0 + self.query_one("#pid-stats-table", DataTable).clear() diff --git a/tui/src/skywalker_tui/widgets/psi_tree.py b/tui/src/skywalker_tui/widgets/psi_tree.py new file mode 100644 index 0000000..65670d3 --- /dev/null +++ b/tui/src/skywalker_tui/widgets/psi_tree.py @@ -0,0 +1,111 @@ +"""Tree-style display of MPEG-2 PSI structure (PAT/PMT). + +Renders a hierarchical view of the Program Association Table and its +child Program Map Tables using Rich markup inside a Static widget. +Shows transport stream ID, program numbers, PMT PIDs, PCR PIDs, +and elementary stream types with their PIDs. +""" + +from textual.widget import Widget +from textual.widgets import Static +from textual.app import ComposeResult + + +class PsiTree(Widget): + """Hierarchical PAT/PMT display for transport stream program structure.""" + + DEFAULT_CSS = """ + PsiTree { + height: 1fr; + min-height: 8; + background: #0e1420; + border: round #1a2a3a; + padding: 1; + overflow-y: auto; + } + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self._pat: dict | None = None + self._pmts: dict[int, dict] = {} + + def compose(self) -> ComposeResult: + yield Static( + "[#506878]Waiting for PSI data...[/]", id="psi-content" + ) + + def update_pat(self, pat: dict) -> None: + """Update the Program Association Table and redraw.""" + self._pat = pat + self._refresh() + + def update_pmt(self, pmt_pid: int, pmt: dict) -> None: + """Update a Program Map Table and redraw.""" + self._pmts[pmt_pid] = pmt + self._refresh() + + def clear_tree(self) -> None: + """Reset all PSI state and show placeholder.""" + self._pat = None + self._pmts.clear() + if self.is_mounted: + self.query_one("#psi-content", Static).update( + "[#506878]Waiting for PSI data...[/]" + ) + + def _refresh(self) -> None: + """Rebuild the tree markup from current PAT/PMT data.""" + if not self.is_mounted: + return + + lines: list[str] = [] + + if self._pat: + tsid = self._pat.get("transport_stream_id", 0) + ver = self._pat.get("version", 0) + lines.append( + f"[bold #00d4aa]PAT[/] [#506878]TSID=0x{tsid:04X} v{ver}[/]" + ) + + programs = self._pat.get("programs", {}) + # Separate NIT (program 0) from real programs + real_progs = {k: v for k, v in programs.items() if k != 0} + + for prog_num, pmt_pid in sorted(programs.items()): + if prog_num == 0: + lines.append( + f" [#7090a8]\u251c\u2500[/] [#506878]NIT[/] " + f"PID=0x{pmt_pid:04X}" + ) + else: + is_last = prog_num == max(real_progs.keys()) + prefix = "\u2514\u2500" if is_last else "\u251c\u2500" + lines.append( + f" [#7090a8]{prefix}[/] " + f"[bold #c8d0d8]Program {prog_num}[/] " + f"PMT=0x{pmt_pid:04X}" + ) + # Expand PMT details if available + if pmt_pid in self._pmts: + pmt = self._pmts[pmt_pid] + pcr_pid = pmt.get("pcr_pid", 0) + indent = " " if is_last else "\u2502 " + lines.append( + f" {indent} [#506878]PCR PID=0x{pcr_pid:04X}[/]" + ) + streams = pmt.get("streams", []) + for j, s in enumerate(streams): + s_last = j == len(streams) - 1 + s_prefix = "\u2514\u2500" if s_last else "\u251c\u2500" + type_name = s.get("type_name", "Unknown") + epid = s.get("elementary_pid", 0) + lines.append( + f" {indent} [#7090a8]{s_prefix}[/] " + f"[#c8d0d8]{type_name}[/] " + f"PID=0x{epid:04X}" + ) + else: + lines.append("[#506878]No PAT received yet[/]") + + self.query_one("#psi-content", Static).update("\n".join(lines))