Firmware v3.03.0: DiSEqC Manchester encoder (cmd 0x8D extended), parameterized spectrum sweep (0xBA), adaptive blind scan (0xBB), error code reporting (0xBC). All new function locals moved to XDATA to fit within FX2LP 256-byte internal RAM constraint. Motor control: DiSEqC 1.2 positioner with USALS GotoX, stored positions, interactive keyboard jog, 30-second safety auto-halt. QO-100 DATV: Es'hail-2 wideband transponder tools — LNB IF calculator, narrowband scan, tune, and TS-to-video pipe (ffplay/mpv). Carrier survey: six-stage pipeline (coarse sweep → peak detection → fine sweep → blind scan → TS sample → catalog). JSON catalog with differential analysis, QO-100 optimized mode, CSV/text export. TUI: F9 Motor screen (3-column layout with signal gauge), F10 Survey screen (Full Band + QO-100 tabs). Bridge, demo, and theme updated. Docs: motor.mdx, survey.mdx, qo100-datv.mdx guide, tui.mdx updated for 10 screens. Site builds 41 pages, all links valid.
542 lines
17 KiB
Python
Executable File
542 lines
17 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Genpix SkyWalker-1 DiSEqC 1.2 motor control tool.
|
|
|
|
Subcommands:
|
|
halt - Stop motor movement
|
|
east/west - Drive motor (continuous or stepped)
|
|
goto - Go to stored position slot
|
|
store - Store current position to slot
|
|
gotox - USALS GotoX (automatic orbital positioning)
|
|
limit - Set software travel limit
|
|
nolimits - Disable software limits
|
|
raw - Send raw DiSEqC bytes
|
|
interactive - Keyboard-driven jog controller with live signal
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import argparse
|
|
import time
|
|
import atexit
|
|
import select
|
|
import termios
|
|
import tty
|
|
|
|
# Add tools directory to path for library import
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
from skywalker_lib import SkyWalker1, signal_bar
|
|
|
|
|
|
# -- Safety timeout for continuous drive --
|
|
|
|
CONTINUOUS_DRIVE_TIMEOUT = 30.0 # seconds
|
|
|
|
|
|
# -- Subcommand handlers --
|
|
|
|
def cmd_halt(sw: SkyWalker1, args: argparse.Namespace) -> None:
|
|
"""Stop motor movement."""
|
|
sw.motor_halt()
|
|
print("Motor halted")
|
|
|
|
|
|
def cmd_east(sw: SkyWalker1, args: argparse.Namespace) -> None:
|
|
"""Drive motor east."""
|
|
steps = args.steps
|
|
if steps:
|
|
sw.motor_drive_east(steps)
|
|
print(f"Driving east {steps} step(s)")
|
|
else:
|
|
sw.motor_drive_east(0)
|
|
print(f"Driving east (continuous) -- will auto-halt after {CONTINUOUS_DRIVE_TIMEOUT:.0f}s")
|
|
print("Press Ctrl-C to stop")
|
|
_wait_with_halt(sw, CONTINUOUS_DRIVE_TIMEOUT)
|
|
|
|
|
|
def cmd_west(sw: SkyWalker1, args: argparse.Namespace) -> None:
|
|
"""Drive motor west."""
|
|
steps = args.steps
|
|
if steps:
|
|
sw.motor_drive_west(steps)
|
|
print(f"Driving west {steps} step(s)")
|
|
else:
|
|
sw.motor_drive_west(0)
|
|
print(f"Driving west (continuous) -- will auto-halt after {CONTINUOUS_DRIVE_TIMEOUT:.0f}s")
|
|
print("Press Ctrl-C to stop")
|
|
_wait_with_halt(sw, CONTINUOUS_DRIVE_TIMEOUT)
|
|
|
|
|
|
def _wait_with_halt(sw: SkyWalker1, timeout: float) -> None:
|
|
"""Wait for timeout or Ctrl-C, then halt the motor."""
|
|
try:
|
|
time.sleep(timeout)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
finally:
|
|
sw.motor_halt()
|
|
print("\nMotor halted")
|
|
|
|
|
|
def cmd_goto(sw: SkyWalker1, args: argparse.Namespace) -> None:
|
|
"""Go to a stored position slot."""
|
|
slot = args.slot
|
|
if slot == 0:
|
|
print("Going to reference position (slot 0)")
|
|
else:
|
|
print(f"Going to stored position {slot}")
|
|
sw.motor_goto_position(slot)
|
|
|
|
|
|
def cmd_store(sw: SkyWalker1, args: argparse.Namespace) -> None:
|
|
"""Store the current dish position into a slot."""
|
|
slot = args.slot
|
|
sw.motor_store_position(slot)
|
|
print(f"Current position stored in slot {slot}")
|
|
|
|
|
|
def cmd_gotox(sw: SkyWalker1, args: argparse.Namespace) -> None:
|
|
"""USALS GotoX: calculate and drive to satellite longitude."""
|
|
sat_lon = args.sat
|
|
obs_lon = args.lon
|
|
obs_lat = args.lat
|
|
|
|
# Import usals_angle for display
|
|
from skywalker_lib import usals_angle, usals_encode_angle
|
|
|
|
angle = usals_angle(obs_lon, sat_lon, obs_lat)
|
|
hh, ll = usals_encode_angle(angle)
|
|
direction = "west" if angle < 0 else "east"
|
|
|
|
print(f"USALS GotoX")
|
|
print(f" Observer: {obs_lon:.2f} lon, {obs_lat:.2f} lat")
|
|
print(f" Satellite: {sat_lon:.2f} lon")
|
|
print(f" Motor angle: {abs(angle):.2f} deg {direction}")
|
|
print(f" DiSEqC: E0 31 6E {hh:02X} {ll:02X}")
|
|
|
|
sw.motor_goto_x(obs_lon, sat_lon)
|
|
print(" Command sent")
|
|
|
|
|
|
def cmd_limit(sw: SkyWalker1, args: argparse.Namespace) -> None:
|
|
"""Set a software travel limit at the current position."""
|
|
direction = args.direction
|
|
sw.motor_set_limit(direction)
|
|
print(f"Software {direction} limit set at current position")
|
|
|
|
|
|
def cmd_nolimits(sw: SkyWalker1, args: argparse.Namespace) -> None:
|
|
"""Disable software travel limits."""
|
|
sw.motor_disable_limits()
|
|
print("Software limits disabled")
|
|
|
|
|
|
def cmd_raw(sw: SkyWalker1, args: argparse.Namespace) -> None:
|
|
"""Send a raw DiSEqC command."""
|
|
raw_bytes = bytes(int(b, 16) for b in args.bytes)
|
|
if len(raw_bytes) < 3 or len(raw_bytes) > 6:
|
|
print("DiSEqC message must be 3-6 bytes")
|
|
sys.exit(1)
|
|
print(f"Sending DiSEqC: {raw_bytes.hex(' ')}")
|
|
sw.send_diseqc_message(raw_bytes)
|
|
print(" OK")
|
|
|
|
|
|
# -- Interactive jog controller --
|
|
|
|
def cmd_interactive(sw: SkyWalker1, args: argparse.Namespace) -> None:
|
|
"""Keyboard-driven jog controller with live signal monitoring."""
|
|
|
|
# Register atexit halt -- fires on unexpected exit, Ctrl-C leak, etc.
|
|
def emergency_halt():
|
|
try:
|
|
sw.motor_halt()
|
|
except Exception:
|
|
pass
|
|
|
|
atexit.register(emergency_halt)
|
|
|
|
# Save terminal state and switch to raw mode
|
|
fd = sys.stdin.fileno()
|
|
old_attrs = termios.tcgetattr(fd)
|
|
|
|
def restore_terminal():
|
|
termios.tcsetattr(fd, termios.TCSAFLUSH, old_attrs)
|
|
# Show cursor, clear line
|
|
sys.stdout.write("\033[?25h\n")
|
|
sys.stdout.flush()
|
|
|
|
atexit.register(restore_terminal)
|
|
|
|
tty.setraw(fd)
|
|
# Hide cursor for cleaner display
|
|
sys.stdout.write("\033[?25l")
|
|
sys.stdout.flush()
|
|
|
|
_interactive_loop(sw, fd, args.verbose)
|
|
|
|
# Cleanup (atexit handles restore, but be explicit for normal exit)
|
|
restore_terminal()
|
|
emergency_halt()
|
|
atexit.unregister(restore_terminal)
|
|
atexit.unregister(emergency_halt)
|
|
|
|
|
|
def _interactive_loop(sw: SkyWalker1, fd: int, verbose: bool) -> None:
|
|
"""Main loop for interactive jog mode."""
|
|
|
|
state = {
|
|
"driving": None, # None, 'east', or 'west'
|
|
"drive_start": 0.0, # time.time() when drive began
|
|
"store_mode": False, # True = next digit stores position
|
|
"running": True,
|
|
}
|
|
|
|
POLL_INTERVAL = 0.5 # seconds between signal refreshes (~2 Hz)
|
|
last_refresh = 0.0
|
|
|
|
_draw_header()
|
|
|
|
while state["running"]:
|
|
now = time.time()
|
|
|
|
# Auto-halt safety: stop after CONTINUOUS_DRIVE_TIMEOUT
|
|
if state["driving"] and (now - state["drive_start"]) >= CONTINUOUS_DRIVE_TIMEOUT:
|
|
sw.motor_halt()
|
|
_status_line(f"AUTO-HALT: {CONTINUOUS_DRIVE_TIMEOUT:.0f}s safety limit reached")
|
|
state["driving"] = None
|
|
|
|
# Refresh signal display at ~2 Hz
|
|
if now - last_refresh >= POLL_INTERVAL:
|
|
try:
|
|
sig = sw.signal_monitor()
|
|
_draw_signal(sig, state)
|
|
except Exception:
|
|
_draw_signal(None, state)
|
|
last_refresh = now
|
|
|
|
# Non-blocking key read
|
|
if select.select([fd], [], [], 0.05)[0]:
|
|
ch = os.read(fd, 8)
|
|
_handle_key(sw, ch, state)
|
|
|
|
|
|
def _handle_key(sw: SkyWalker1, ch: bytes, state: dict) -> None:
|
|
"""Process a keypress in interactive mode."""
|
|
|
|
# Escape sequences for arrow keys
|
|
if ch == b'\x1b[D' or ch == b'\x1b[C':
|
|
direction = 'west' if ch == b'\x1b[D' else 'east'
|
|
if state["driving"] != direction:
|
|
if direction == 'east':
|
|
sw.motor_drive_east(0)
|
|
else:
|
|
sw.motor_drive_west(0)
|
|
state["driving"] = direction
|
|
state["drive_start"] = time.time()
|
|
_status_line(f"Driving {direction}...")
|
|
return
|
|
|
|
# Space = halt
|
|
if ch == b' ':
|
|
sw.motor_halt()
|
|
state["driving"] = None
|
|
_status_line("Halted")
|
|
return
|
|
|
|
# q = quit
|
|
if ch in (b'q', b'Q', b'\x03'): # q, Q, or Ctrl-C
|
|
sw.motor_halt()
|
|
state["driving"] = None
|
|
state["running"] = False
|
|
_status_line("Quitting...")
|
|
return
|
|
|
|
# s = enter store mode (next digit saves position)
|
|
if ch == b's' or ch == b'S':
|
|
state["store_mode"] = True
|
|
_status_line("Store mode: press 1-9 to save position")
|
|
return
|
|
|
|
# g = prompt for USALS GotoX
|
|
if ch == b'g' or ch == b'G':
|
|
_gotox_prompt(sw, state)
|
|
return
|
|
|
|
# Digits 1-9: goto or store
|
|
if len(ch) == 1 and ord(ch) in range(ord('1'), ord('9') + 1):
|
|
slot = ord(ch) - ord('0')
|
|
if state["store_mode"]:
|
|
sw.motor_store_position(slot)
|
|
_status_line(f"Position stored in slot {slot}")
|
|
state["store_mode"] = False
|
|
else:
|
|
sw.motor_goto_position(slot)
|
|
state["driving"] = None
|
|
_status_line(f"Going to position {slot}")
|
|
return
|
|
|
|
# 0 = goto reference
|
|
if ch == b'0':
|
|
if state["store_mode"]:
|
|
_status_line("Slot 0 is reference -- not storable")
|
|
state["store_mode"] = False
|
|
else:
|
|
sw.motor_goto_position(0)
|
|
state["driving"] = None
|
|
_status_line("Going to reference (slot 0)")
|
|
return
|
|
|
|
# Unknown key -- clear store mode
|
|
if state["store_mode"]:
|
|
state["store_mode"] = False
|
|
_status_line("Store cancelled")
|
|
|
|
|
|
def _gotox_prompt(sw: SkyWalker1, state: dict) -> None:
|
|
"""
|
|
Prompt for USALS GotoX parameters in raw terminal mode.
|
|
|
|
Reads satellite longitude and observer longitude character-by-character
|
|
since we're in raw mode and can't use input().
|
|
"""
|
|
_status_line("GotoX: enter satellite longitude (e.g. -97.5): ")
|
|
sat_str = _raw_readline()
|
|
if sat_str is None:
|
|
_status_line("GotoX cancelled")
|
|
return
|
|
|
|
_status_line(f"Sat {sat_str} -- enter observer longitude: ")
|
|
obs_str = _raw_readline()
|
|
if obs_str is None:
|
|
_status_line("GotoX cancelled")
|
|
return
|
|
|
|
try:
|
|
sat_lon = float(sat_str)
|
|
obs_lon = float(obs_str)
|
|
except ValueError:
|
|
_status_line("Invalid coordinates")
|
|
return
|
|
|
|
from skywalker_lib import usals_angle
|
|
angle = usals_angle(obs_lon, sat_lon)
|
|
direction = "W" if angle < 0 else "E"
|
|
|
|
sw.motor_goto_x(obs_lon, sat_lon)
|
|
state["driving"] = None
|
|
_status_line(f"GotoX: sat {sat_lon} obs {obs_lon} -> {abs(angle):.1f} deg {direction}")
|
|
|
|
|
|
def _raw_readline() -> str | None:
|
|
"""Read a line of text in raw terminal mode, echoing characters."""
|
|
fd = sys.stdin.fileno()
|
|
buf = []
|
|
while True:
|
|
if select.select([fd], [], [], 30.0)[0]:
|
|
ch = os.read(fd, 1)
|
|
if ch == b'\r' or ch == b'\n':
|
|
sys.stdout.write("\r\n")
|
|
sys.stdout.flush()
|
|
return ''.join(buf)
|
|
if ch == b'\x03' or ch == b'\x1b': # Ctrl-C or Escape
|
|
return None
|
|
if ch == b'\x7f' or ch == b'\x08': # Backspace
|
|
if buf:
|
|
buf.pop()
|
|
sys.stdout.write("\b \b")
|
|
sys.stdout.flush()
|
|
continue
|
|
# Accept digits, minus, period
|
|
c = ch.decode('ascii', errors='ignore')
|
|
if c in '0123456789.-':
|
|
buf.append(c)
|
|
sys.stdout.write(c)
|
|
sys.stdout.flush()
|
|
else:
|
|
# Timeout waiting for input
|
|
return None
|
|
|
|
|
|
# -- Display helpers --
|
|
|
|
def _draw_header() -> None:
|
|
"""Print the interactive mode header (once at startup)."""
|
|
sys.stdout.write("\r\n")
|
|
sys.stdout.write(" SkyWalker-1 Motor Control\r\n")
|
|
sys.stdout.write(" ========================\r\n")
|
|
sys.stdout.write(" Left/Right : jog west/east (continuous)\r\n")
|
|
sys.stdout.write(" Space : halt\r\n")
|
|
sys.stdout.write(" 1-9 : goto stored position\r\n")
|
|
sys.stdout.write(" s + 1-9 : store to position slot\r\n")
|
|
sys.stdout.write(" g : USALS GotoX prompt\r\n")
|
|
sys.stdout.write(" q : quit\r\n")
|
|
sys.stdout.write("\r\n")
|
|
sys.stdout.flush()
|
|
|
|
|
|
def _draw_signal(sig: dict | None, state: dict) -> None:
|
|
"""Update the signal monitor line at the bottom of the display."""
|
|
# Save cursor, move to signal display area
|
|
sys.stdout.write("\033[s") # save cursor
|
|
|
|
if sig is None:
|
|
sys.stdout.write("\r\n Signal: -- no data --\033[K")
|
|
else:
|
|
snr_db = sig["snr_db"]
|
|
agc1 = sig["agc1"]
|
|
locked = sig["locked"]
|
|
power_db = sig["power_db"]
|
|
pct = sig["snr_pct"]
|
|
|
|
lock_str = "LOCK" if locked else "----"
|
|
bar = signal_bar(pct, width=25)
|
|
|
|
drive_str = ""
|
|
if state["driving"]:
|
|
elapsed = time.time() - state["drive_start"]
|
|
remaining = CONTINUOUS_DRIVE_TIMEOUT - elapsed
|
|
drive_str = f" [{state['driving'].upper()} {remaining:.0f}s]"
|
|
|
|
line = (f"\r [{lock_str}] SNR {snr_db:5.1f} dB "
|
|
f"AGC {agc1:5d} "
|
|
f"Pwr {power_db:5.1f} dB "
|
|
f"{bar}{drive_str}")
|
|
sys.stdout.write(f"\n{line}\033[K")
|
|
|
|
sys.stdout.write("\033[u") # restore cursor
|
|
sys.stdout.flush()
|
|
|
|
|
|
def _status_line(msg: str) -> None:
|
|
"""Write a status message on a dedicated line."""
|
|
sys.stdout.write(f"\r > {msg}\033[K\r\n")
|
|
sys.stdout.flush()
|
|
|
|
|
|
# -- CLI --
|
|
|
|
def build_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(
|
|
prog="motor.py",
|
|
description="Genpix SkyWalker-1 DiSEqC 1.2 motor control",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""\
|
|
examples:
|
|
%(prog)s halt
|
|
%(prog)s east --steps 10
|
|
%(prog)s west
|
|
%(prog)s goto 3
|
|
%(prog)s store 3
|
|
%(prog)s gotox --sat -97.5 --lon -96.8
|
|
%(prog)s gotox --sat -97.5 --lon -96.8 --lat 32.7
|
|
%(prog)s limit east
|
|
%(prog)s nolimits
|
|
%(prog)s raw E0 31 6B 01
|
|
%(prog)s interactive
|
|
|
|
interactive mode controls:
|
|
Arrow left/right jog west/east (continuous drive)
|
|
Space halt motor
|
|
1-9 go to stored position
|
|
s + 1-9 store position to slot
|
|
g USALS GotoX prompt
|
|
q quit
|
|
|
|
safety:
|
|
Continuous drive auto-halts after 30 seconds.
|
|
Motor is halted on exit (including unexpected termination).
|
|
""")
|
|
parser.add_argument('-v', '--verbose', action='store_true',
|
|
help="Show raw USB traffic")
|
|
|
|
sub = parser.add_subparsers(dest='command')
|
|
|
|
# halt
|
|
sub.add_parser('halt', help="Stop motor movement")
|
|
|
|
# east
|
|
p_east = sub.add_parser('east', help="Drive motor east")
|
|
p_east.add_argument('--steps', type=int, default=0,
|
|
help="Number of steps (0=continuous, 1-127)")
|
|
|
|
# west
|
|
p_west = sub.add_parser('west', help="Drive motor west")
|
|
p_west.add_argument('--steps', type=int, default=0,
|
|
help="Number of steps (0=continuous, 1-127)")
|
|
|
|
# goto
|
|
p_goto = sub.add_parser('goto', help="Go to stored position")
|
|
p_goto.add_argument('slot', type=int,
|
|
help="Position slot (0=reference, 1-255)")
|
|
|
|
# store
|
|
p_store = sub.add_parser('store', help="Store current position")
|
|
p_store.add_argument('slot', type=int,
|
|
help="Position slot to store to (1-255)")
|
|
|
|
# gotox
|
|
p_gotox = sub.add_parser('gotox', help="USALS GotoX (automatic positioning)")
|
|
p_gotox.add_argument('--sat', type=float, required=True,
|
|
help="Satellite longitude (negative=west, e.g. -97.5)")
|
|
p_gotox.add_argument('--lon', type=float, required=True,
|
|
help="Observer longitude (negative=west)")
|
|
p_gotox.add_argument('--lat', type=float, default=0.0,
|
|
help="Observer latitude (default: 0.0)")
|
|
|
|
# limit
|
|
p_limit = sub.add_parser('limit', help="Set software travel limit")
|
|
p_limit.add_argument('direction', choices=['east', 'west'],
|
|
help="Limit direction")
|
|
|
|
# nolimits
|
|
sub.add_parser('nolimits', help="Disable software travel limits")
|
|
|
|
# raw
|
|
p_raw = sub.add_parser('raw', help="Send raw DiSEqC bytes")
|
|
p_raw.add_argument('bytes', nargs='+', metavar='HH',
|
|
help="Hex bytes (e.g. E0 31 6B 01)")
|
|
|
|
# interactive
|
|
sub.add_parser('interactive', help="Keyboard-driven jog controller")
|
|
|
|
return parser
|
|
|
|
|
|
def main():
|
|
parser = build_parser()
|
|
args = parser.parse_args()
|
|
|
|
if not args.command:
|
|
parser.print_help()
|
|
sys.exit(0)
|
|
|
|
dispatch = {
|
|
'halt': cmd_halt,
|
|
'east': cmd_east,
|
|
'west': cmd_west,
|
|
'goto': cmd_goto,
|
|
'store': cmd_store,
|
|
'gotox': cmd_gotox,
|
|
'limit': cmd_limit,
|
|
'nolimits': cmd_nolimits,
|
|
'raw': cmd_raw,
|
|
'interactive': cmd_interactive,
|
|
}
|
|
|
|
handler = dispatch.get(args.command)
|
|
if handler is None:
|
|
parser.print_help()
|
|
sys.exit(1)
|
|
|
|
with SkyWalker1(verbose=args.verbose) as sw:
|
|
# Boot demodulator and enable LNB power for DiSEqC
|
|
sw.ensure_booted()
|
|
sw.start_intersil(on=True)
|
|
handler(sw, args)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|