Ryan Malloy bbdcb243dc Normalize line endings to LF across entire repository
Apply .gitattributes normalization to convert all CRLF line
endings inherited from Windows-origin source files to Unix LF.
175 files, zero content changes.
2026-02-20 10:55:50 -07:00

542 lines
16 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Genpix SkyWalker-1 DiSEqC 1.2 motor control tool.
Subcommands:
halt - Stop motor movement
east/west - Drive motor (continuous or stepped)
goto - Go to stored position slot
store - Store current position to slot
gotox - USALS GotoX (automatic orbital positioning)
limit - Set software travel limit
nolimits - Disable software limits
raw - Send raw DiSEqC bytes
interactive - Keyboard-driven jog controller with live signal
"""
import sys
import os
import argparse
import time
import atexit
import select
import termios
import tty
# Add tools directory to path for library import
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from skywalker_lib import SkyWalker1, signal_bar
# -- Safety timeout for continuous drive --
CONTINUOUS_DRIVE_TIMEOUT = 30.0 # seconds
# -- Subcommand handlers --
def cmd_halt(sw: SkyWalker1, args: argparse.Namespace) -> None:
"""Stop motor movement."""
sw.motor_halt()
print("Motor halted")
def cmd_east(sw: SkyWalker1, args: argparse.Namespace) -> None:
"""Drive motor east."""
steps = args.steps
if steps:
sw.motor_drive_east(steps)
print(f"Driving east {steps} step(s)")
else:
sw.motor_drive_east(0)
print(f"Driving east (continuous) -- will auto-halt after {CONTINUOUS_DRIVE_TIMEOUT:.0f}s")
print("Press Ctrl-C to stop")
_wait_with_halt(sw, CONTINUOUS_DRIVE_TIMEOUT)
def cmd_west(sw: SkyWalker1, args: argparse.Namespace) -> None:
"""Drive motor west."""
steps = args.steps
if steps:
sw.motor_drive_west(steps)
print(f"Driving west {steps} step(s)")
else:
sw.motor_drive_west(0)
print(f"Driving west (continuous) -- will auto-halt after {CONTINUOUS_DRIVE_TIMEOUT:.0f}s")
print("Press Ctrl-C to stop")
_wait_with_halt(sw, CONTINUOUS_DRIVE_TIMEOUT)
def _wait_with_halt(sw: SkyWalker1, timeout: float) -> None:
"""Wait for timeout or Ctrl-C, then halt the motor."""
try:
time.sleep(timeout)
except KeyboardInterrupt:
pass
finally:
sw.motor_halt()
print("\nMotor halted")
def cmd_goto(sw: SkyWalker1, args: argparse.Namespace) -> None:
"""Go to a stored position slot."""
slot = args.slot
if slot == 0:
print("Going to reference position (slot 0)")
else:
print(f"Going to stored position {slot}")
sw.motor_goto_position(slot)
def cmd_store(sw: SkyWalker1, args: argparse.Namespace) -> None:
"""Store the current dish position into a slot."""
slot = args.slot
sw.motor_store_position(slot)
print(f"Current position stored in slot {slot}")
def cmd_gotox(sw: SkyWalker1, args: argparse.Namespace) -> None:
"""USALS GotoX: calculate and drive to satellite longitude."""
sat_lon = args.sat
obs_lon = args.lon
obs_lat = args.lat
# Import usals_angle for display
from skywalker_lib import usals_angle, usals_encode_angle
angle = usals_angle(obs_lon, sat_lon, obs_lat)
hh, ll = usals_encode_angle(angle)
direction = "west" if angle < 0 else "east"
print(f"USALS GotoX")
print(f" Observer: {obs_lon:.2f} lon, {obs_lat:.2f} lat")
print(f" Satellite: {sat_lon:.2f} lon")
print(f" Motor angle: {abs(angle):.2f} deg {direction}")
print(f" DiSEqC: E0 31 6E {hh:02X} {ll:02X}")
sw.motor_goto_x(obs_lon, sat_lon)
print(" Command sent")
def cmd_limit(sw: SkyWalker1, args: argparse.Namespace) -> None:
"""Set a software travel limit at the current position."""
direction = args.direction
sw.motor_set_limit(direction)
print(f"Software {direction} limit set at current position")
def cmd_nolimits(sw: SkyWalker1, args: argparse.Namespace) -> None:
"""Disable software travel limits."""
sw.motor_disable_limits()
print("Software limits disabled")
def cmd_raw(sw: SkyWalker1, args: argparse.Namespace) -> None:
"""Send a raw DiSEqC command."""
raw_bytes = bytes(int(b, 16) for b in args.bytes)
if len(raw_bytes) < 3 or len(raw_bytes) > 6:
print("DiSEqC message must be 3-6 bytes")
sys.exit(1)
print(f"Sending DiSEqC: {raw_bytes.hex(' ')}")
sw.send_diseqc_message(raw_bytes)
print(" OK")
# -- Interactive jog controller --
def cmd_interactive(sw: SkyWalker1, args: argparse.Namespace) -> None:
"""Keyboard-driven jog controller with live signal monitoring."""
# Register atexit halt -- fires on unexpected exit, Ctrl-C leak, etc.
def emergency_halt():
try:
sw.motor_halt()
except Exception:
pass
atexit.register(emergency_halt)
# Save terminal state and switch to raw mode
fd = sys.stdin.fileno()
old_attrs = termios.tcgetattr(fd)
def restore_terminal():
termios.tcsetattr(fd, termios.TCSAFLUSH, old_attrs)
# Show cursor, clear line
sys.stdout.write("\033[?25h\n")
sys.stdout.flush()
atexit.register(restore_terminal)
tty.setraw(fd)
# Hide cursor for cleaner display
sys.stdout.write("\033[?25l")
sys.stdout.flush()
_interactive_loop(sw, fd, args.verbose)
# Cleanup (atexit handles restore, but be explicit for normal exit)
restore_terminal()
emergency_halt()
atexit.unregister(restore_terminal)
atexit.unregister(emergency_halt)
def _interactive_loop(sw: SkyWalker1, fd: int, verbose: bool) -> None:
"""Main loop for interactive jog mode."""
state = {
"driving": None, # None, 'east', or 'west'
"drive_start": 0.0, # time.time() when drive began
"store_mode": False, # True = next digit stores position
"running": True,
}
POLL_INTERVAL = 0.5 # seconds between signal refreshes (~2 Hz)
last_refresh = 0.0
_draw_header()
while state["running"]:
now = time.time()
# Auto-halt safety: stop after CONTINUOUS_DRIVE_TIMEOUT
if state["driving"] and (now - state["drive_start"]) >= CONTINUOUS_DRIVE_TIMEOUT:
sw.motor_halt()
_status_line(f"AUTO-HALT: {CONTINUOUS_DRIVE_TIMEOUT:.0f}s safety limit reached")
state["driving"] = None
# Refresh signal display at ~2 Hz
if now - last_refresh >= POLL_INTERVAL:
try:
sig = sw.signal_monitor()
_draw_signal(sig, state)
except Exception:
_draw_signal(None, state)
last_refresh = now
# Non-blocking key read
if select.select([fd], [], [], 0.05)[0]:
ch = os.read(fd, 8)
_handle_key(sw, ch, state)
def _handle_key(sw: SkyWalker1, ch: bytes, state: dict) -> None:
"""Process a keypress in interactive mode."""
# Escape sequences for arrow keys
if ch == b'\x1b[D' or ch == b'\x1b[C':
direction = 'west' if ch == b'\x1b[D' else 'east'
if state["driving"] != direction:
if direction == 'east':
sw.motor_drive_east(0)
else:
sw.motor_drive_west(0)
state["driving"] = direction
state["drive_start"] = time.time()
_status_line(f"Driving {direction}...")
return
# Space = halt
if ch == b' ':
sw.motor_halt()
state["driving"] = None
_status_line("Halted")
return
# q = quit
if ch in (b'q', b'Q', b'\x03'): # q, Q, or Ctrl-C
sw.motor_halt()
state["driving"] = None
state["running"] = False
_status_line("Quitting...")
return
# s = enter store mode (next digit saves position)
if ch == b's' or ch == b'S':
state["store_mode"] = True
_status_line("Store mode: press 1-9 to save position")
return
# g = prompt for USALS GotoX
if ch == b'g' or ch == b'G':
_gotox_prompt(sw, state)
return
# Digits 1-9: goto or store
if len(ch) == 1 and ord(ch) in range(ord('1'), ord('9') + 1):
slot = ord(ch) - ord('0')
if state["store_mode"]:
sw.motor_store_position(slot)
_status_line(f"Position stored in slot {slot}")
state["store_mode"] = False
else:
sw.motor_goto_position(slot)
state["driving"] = None
_status_line(f"Going to position {slot}")
return
# 0 = goto reference
if ch == b'0':
if state["store_mode"]:
_status_line("Slot 0 is reference -- not storable")
state["store_mode"] = False
else:
sw.motor_goto_position(0)
state["driving"] = None
_status_line("Going to reference (slot 0)")
return
# Unknown key -- clear store mode
if state["store_mode"]:
state["store_mode"] = False
_status_line("Store cancelled")
def _gotox_prompt(sw: SkyWalker1, state: dict) -> None:
"""
Prompt for USALS GotoX parameters in raw terminal mode.
Reads satellite longitude and observer longitude character-by-character
since we're in raw mode and can't use input().
"""
_status_line("GotoX: enter satellite longitude (e.g. -97.5): ")
sat_str = _raw_readline()
if sat_str is None:
_status_line("GotoX cancelled")
return
_status_line(f"Sat {sat_str} -- enter observer longitude: ")
obs_str = _raw_readline()
if obs_str is None:
_status_line("GotoX cancelled")
return
try:
sat_lon = float(sat_str)
obs_lon = float(obs_str)
except ValueError:
_status_line("Invalid coordinates")
return
from skywalker_lib import usals_angle
angle = usals_angle(obs_lon, sat_lon)
direction = "W" if angle < 0 else "E"
sw.motor_goto_x(obs_lon, sat_lon)
state["driving"] = None
_status_line(f"GotoX: sat {sat_lon} obs {obs_lon} -> {abs(angle):.1f} deg {direction}")
def _raw_readline() -> str | None:
"""Read a line of text in raw terminal mode, echoing characters."""
fd = sys.stdin.fileno()
buf = []
while True:
if select.select([fd], [], [], 30.0)[0]:
ch = os.read(fd, 1)
if ch == b'\r' or ch == b'\n':
sys.stdout.write("\r\n")
sys.stdout.flush()
return ''.join(buf)
if ch == b'\x03' or ch == b'\x1b': # Ctrl-C or Escape
return None
if ch == b'\x7f' or ch == b'\x08': # Backspace
if buf:
buf.pop()
sys.stdout.write("\b \b")
sys.stdout.flush()
continue
# Accept digits, minus, period
c = ch.decode('ascii', errors='ignore')
if c in '0123456789.-':
buf.append(c)
sys.stdout.write(c)
sys.stdout.flush()
else:
# Timeout waiting for input
return None
# -- Display helpers --
def _draw_header() -> None:
"""Print the interactive mode header (once at startup)."""
sys.stdout.write("\r\n")
sys.stdout.write(" SkyWalker-1 Motor Control\r\n")
sys.stdout.write(" ========================\r\n")
sys.stdout.write(" Left/Right : jog west/east (continuous)\r\n")
sys.stdout.write(" Space : halt\r\n")
sys.stdout.write(" 1-9 : goto stored position\r\n")
sys.stdout.write(" s + 1-9 : store to position slot\r\n")
sys.stdout.write(" g : USALS GotoX prompt\r\n")
sys.stdout.write(" q : quit\r\n")
sys.stdout.write("\r\n")
sys.stdout.flush()
def _draw_signal(sig: dict | None, state: dict) -> None:
"""Update the signal monitor line at the bottom of the display."""
# Save cursor, move to signal display area
sys.stdout.write("\033[s") # save cursor
if sig is None:
sys.stdout.write("\r\n Signal: -- no data --\033[K")
else:
snr_db = sig["snr_db"]
agc1 = sig["agc1"]
locked = sig["locked"]
power_db = sig["power_db"]
pct = sig["snr_pct"]
lock_str = "LOCK" if locked else "----"
bar = signal_bar(pct, width=25)
drive_str = ""
if state["driving"]:
elapsed = time.time() - state["drive_start"]
remaining = CONTINUOUS_DRIVE_TIMEOUT - elapsed
drive_str = f" [{state['driving'].upper()} {remaining:.0f}s]"
line = (f"\r [{lock_str}] SNR {snr_db:5.1f} dB "
f"AGC {agc1:5d} "
f"Pwr {power_db:5.1f} dB "
f"{bar}{drive_str}")
sys.stdout.write(f"\n{line}\033[K")
sys.stdout.write("\033[u") # restore cursor
sys.stdout.flush()
def _status_line(msg: str) -> None:
"""Write a status message on a dedicated line."""
sys.stdout.write(f"\r > {msg}\033[K\r\n")
sys.stdout.flush()
# -- CLI --
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="motor.py",
description="Genpix SkyWalker-1 DiSEqC 1.2 motor control",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""\
examples:
%(prog)s halt
%(prog)s east --steps 10
%(prog)s west
%(prog)s goto 3
%(prog)s store 3
%(prog)s gotox --sat -97.5 --lon -96.8
%(prog)s gotox --sat -97.5 --lon -96.8 --lat 32.7
%(prog)s limit east
%(prog)s nolimits
%(prog)s raw E0 31 6B 01
%(prog)s interactive
interactive mode controls:
Arrow left/right jog west/east (continuous drive)
Space halt motor
1-9 go to stored position
s + 1-9 store position to slot
g USALS GotoX prompt
q quit
safety:
Continuous drive auto-halts after 30 seconds.
Motor is halted on exit (including unexpected termination).
""")
parser.add_argument('-v', '--verbose', action='store_true',
help="Show raw USB traffic")
sub = parser.add_subparsers(dest='command')
# halt
sub.add_parser('halt', help="Stop motor movement")
# east
p_east = sub.add_parser('east', help="Drive motor east")
p_east.add_argument('--steps', type=int, default=0,
help="Number of steps (0=continuous, 1-127)")
# west
p_west = sub.add_parser('west', help="Drive motor west")
p_west.add_argument('--steps', type=int, default=0,
help="Number of steps (0=continuous, 1-127)")
# goto
p_goto = sub.add_parser('goto', help="Go to stored position")
p_goto.add_argument('slot', type=int,
help="Position slot (0=reference, 1-255)")
# store
p_store = sub.add_parser('store', help="Store current position")
p_store.add_argument('slot', type=int,
help="Position slot to store to (1-255)")
# gotox
p_gotox = sub.add_parser('gotox', help="USALS GotoX (automatic positioning)")
p_gotox.add_argument('--sat', type=float, required=True,
help="Satellite longitude (negative=west, e.g. -97.5)")
p_gotox.add_argument('--lon', type=float, required=True,
help="Observer longitude (negative=west)")
p_gotox.add_argument('--lat', type=float, default=0.0,
help="Observer latitude (default: 0.0)")
# limit
p_limit = sub.add_parser('limit', help="Set software travel limit")
p_limit.add_argument('direction', choices=['east', 'west'],
help="Limit direction")
# nolimits
sub.add_parser('nolimits', help="Disable software travel limits")
# raw
p_raw = sub.add_parser('raw', help="Send raw DiSEqC bytes")
p_raw.add_argument('bytes', nargs='+', metavar='HH',
help="Hex bytes (e.g. E0 31 6B 01)")
# interactive
sub.add_parser('interactive', help="Keyboard-driven jog controller")
return parser
def main():
parser = build_parser()
args = parser.parse_args()
if not args.command:
parser.print_help()
sys.exit(0)
dispatch = {
'halt': cmd_halt,
'east': cmd_east,
'west': cmd_west,
'goto': cmd_goto,
'store': cmd_store,
'gotox': cmd_gotox,
'limit': cmd_limit,
'nolimits': cmd_nolimits,
'raw': cmd_raw,
'interactive': cmd_interactive,
}
handler = dispatch.get(args.command)
if handler is None:
parser.print_help()
sys.exit(1)
with SkyWalker1(verbose=args.verbose) as sw:
# Boot demodulator and enable LNB power for DiSEqC
sw.ensure_booted()
sw.start_intersil(on=True)
handler(sw, args)
if __name__ == '__main__':
main()