Custom firmware (SDCC + fx2lib) implements all stock vendor commands (0x80-0x94) plus new commands for spectrum sweep (0xB0), raw BCM4500 register access (0xB1/0xB2), and blind scan (0xB3). Compiles to 6.3KB of code with healthy RAM margins. RAM loader (fw_load.py) uses the FX2 0xA0 vendor request to load firmware into RAM without touching EEPROM -- power cycle restores factory firmware. Supports Intel HEX and raw binary formats.
617 lines
20 KiB
Python
Executable File
617 lines
20 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Genpix SkyWalker-1 RAM firmware loader.
|
|
|
|
Loads firmware into the Cypress FX2 (CY7C68013A) internal/external RAM
|
|
via the standard 0xA0 vendor request. This does NOT touch the EEPROM --
|
|
power-cycling the device restores the factory-programmed firmware.
|
|
|
|
Use case: firmware development and testing. Load, test, power-cycle.
|
|
|
|
Loading sequence:
|
|
1. Halt CPU: write 0x01 to CPUCS register at 0xE600
|
|
2. Write code segments into RAM
|
|
3. Start CPU: write 0x00 to CPUCS at 0xE600
|
|
|
|
After starting, the FX2 runs the new firmware and typically
|
|
re-enumerates on USB with new VID/PID/descriptors.
|
|
|
|
Supports Intel HEX (.ihx/.hex) and raw binary (.bix/.bin) formats.
|
|
"""
|
|
|
|
import sys
|
|
import argparse
|
|
import time
|
|
import os
|
|
|
|
try:
|
|
import usb.core
|
|
import usb.util
|
|
except ImportError:
|
|
print("pyusb required: pip install pyusb")
|
|
sys.exit(1)
|
|
|
|
# Genpix SkyWalker-1
|
|
SKYWALKER_VID = 0x09C0
|
|
SKYWALKER_PID = 0x0203
|
|
|
|
# Bare/unprogrammed Cypress FX2 (no EEPROM or blank EEPROM)
|
|
CYPRESS_VID = 0x04B4
|
|
CYPRESS_PID = 0x8613
|
|
|
|
# FX2 vendor request for RAM access (built into silicon boot ROM)
|
|
FX2_RAM_REQUEST = 0xA0
|
|
|
|
# CPUCS register -- controls 8051 run/halt state
|
|
CPUCS_ADDR = 0xE600
|
|
|
|
# Max bytes per control transfer. The FX2 TRM says 64 bytes for
|
|
# the control endpoint buffer, so we stay conservative.
|
|
CHUNK_SIZE = 64
|
|
|
|
|
|
def find_device(force=False):
|
|
"""Find a SkyWalker-1 or bare FX2 device on USB."""
|
|
# Try SkyWalker-1 first
|
|
dev = usb.core.find(idVendor=SKYWALKER_VID, idProduct=SKYWALKER_PID)
|
|
if dev is not None:
|
|
print(f"Found SkyWalker-1: Bus {dev.bus} Addr {dev.address} "
|
|
f"(VID 0x{SKYWALKER_VID:04X} PID 0x{SKYWALKER_PID:04X})")
|
|
return dev
|
|
|
|
# Try bare Cypress FX2
|
|
dev = usb.core.find(idVendor=CYPRESS_VID, idProduct=CYPRESS_PID)
|
|
if dev is not None:
|
|
print(f"Found bare Cypress FX2: Bus {dev.bus} Addr {dev.address} "
|
|
f"(VID 0x{CYPRESS_VID:04X} PID 0x{CYPRESS_PID:04X})")
|
|
return dev
|
|
|
|
if force:
|
|
# Last resort: scan for any device the user might want
|
|
print("No SkyWalker-1 or bare FX2 found. --force is set but no "
|
|
"target device discovered.")
|
|
else:
|
|
print("No SkyWalker-1 (09C0:0203) or bare FX2 (04B4:8613) found.")
|
|
print("Is the device plugged in?")
|
|
sys.exit(1)
|
|
|
|
|
|
def detach_driver(dev):
|
|
"""Detach kernel driver if attached. Returns interface number or None."""
|
|
intf_num = None
|
|
for cfg in dev:
|
|
for intf in cfg:
|
|
if dev.is_kernel_driver_active(intf.bInterfaceNumber):
|
|
try:
|
|
dev.detach_kernel_driver(intf.bInterfaceNumber)
|
|
intf_num = intf.bInterfaceNumber
|
|
except usb.core.USBError as e:
|
|
print(f"Cannot detach driver: {e}")
|
|
print("Try: sudo modprobe -r dvb_usb_gp8psk")
|
|
sys.exit(1)
|
|
try:
|
|
dev.set_configuration()
|
|
except:
|
|
pass
|
|
return intf_num
|
|
|
|
|
|
def fx2_ram_write(dev, addr, data):
|
|
"""Write bytes to FX2 RAM at the given address via vendor request 0xA0."""
|
|
return dev.ctrl_transfer(
|
|
usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_OUT,
|
|
FX2_RAM_REQUEST, addr, 0, data, 2000)
|
|
|
|
|
|
def fx2_ram_read(dev, addr, length):
|
|
"""Read bytes from FX2 RAM at the given address via vendor request 0xA0."""
|
|
try:
|
|
data = dev.ctrl_transfer(
|
|
usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_IN,
|
|
FX2_RAM_REQUEST, addr, 0, length, 2000)
|
|
return bytes(data)
|
|
except usb.core.USBError:
|
|
return None
|
|
|
|
|
|
def cpu_halt(dev):
|
|
"""Halt the FX2 8051 CPU by writing 0x01 to CPUCS."""
|
|
fx2_ram_write(dev, CPUCS_ADDR, bytes([0x01]))
|
|
|
|
|
|
def cpu_start(dev):
|
|
"""Start the FX2 8051 CPU by writing 0x00 to CPUCS."""
|
|
fx2_ram_write(dev, CPUCS_ADDR, bytes([0x00]))
|
|
|
|
|
|
# -- Intel HEX parser --
|
|
|
|
def parse_ihx(data):
|
|
"""
|
|
Parse an Intel HEX file. Returns list of (address, bytes) segments.
|
|
|
|
Record types:
|
|
00 = data
|
|
01 = EOF
|
|
02 = extended segment address (shifts base by 16)
|
|
04 = extended linear address (shifts base by 16)
|
|
"""
|
|
segments = []
|
|
base_addr = 0
|
|
line_num = 0
|
|
|
|
for raw_line in data.splitlines():
|
|
line_num += 1
|
|
line = raw_line.strip()
|
|
if not line:
|
|
continue
|
|
if isinstance(line, bytes):
|
|
line = line.decode('ascii', errors='replace')
|
|
|
|
if not line.startswith(':'):
|
|
raise ValueError(f"Line {line_num}: missing start code ':'")
|
|
|
|
# Strip the colon and decode hex
|
|
hex_str = line[1:]
|
|
if len(hex_str) < 10:
|
|
raise ValueError(f"Line {line_num}: too short")
|
|
|
|
try:
|
|
raw = bytes.fromhex(hex_str)
|
|
except ValueError:
|
|
raise ValueError(f"Line {line_num}: invalid hex")
|
|
|
|
byte_count = raw[0]
|
|
addr = (raw[1] << 8) | raw[2]
|
|
rec_type = raw[3]
|
|
rec_data = raw[4:4 + byte_count]
|
|
checksum = raw[4 + byte_count]
|
|
|
|
# Verify checksum (two's complement of sum of all bytes before it)
|
|
calc_sum = sum(raw[:4 + byte_count]) & 0xFF
|
|
calc_check = (~calc_sum + 1) & 0xFF
|
|
if checksum != calc_check:
|
|
raise ValueError(
|
|
f"Line {line_num}: checksum mismatch "
|
|
f"(expected 0x{calc_check:02X}, got 0x{checksum:02X})")
|
|
|
|
if len(rec_data) != byte_count:
|
|
raise ValueError(
|
|
f"Line {line_num}: data length mismatch "
|
|
f"(header says {byte_count}, got {len(rec_data)})")
|
|
|
|
if rec_type == 0x00:
|
|
# Data record
|
|
full_addr = base_addr + addr
|
|
segments.append((full_addr, bytes(rec_data)))
|
|
|
|
elif rec_type == 0x01:
|
|
# EOF
|
|
break
|
|
|
|
elif rec_type == 0x02:
|
|
# Extended segment address
|
|
if byte_count != 2:
|
|
raise ValueError(
|
|
f"Line {line_num}: type 02 record must have 2 data bytes")
|
|
base_addr = ((rec_data[0] << 8) | rec_data[1]) << 4
|
|
|
|
elif rec_type == 0x04:
|
|
# Extended linear address
|
|
if byte_count != 2:
|
|
raise ValueError(
|
|
f"Line {line_num}: type 04 record must have 2 data bytes")
|
|
base_addr = ((rec_data[0] << 8) | rec_data[1]) << 16
|
|
|
|
# Silently ignore unknown record types (03, 05, etc.)
|
|
|
|
return segments
|
|
|
|
|
|
def coalesce_segments(segments):
|
|
"""
|
|
Merge adjacent/overlapping segments into contiguous blocks.
|
|
Returns list of (address, bytes) with no gaps.
|
|
"""
|
|
if not segments:
|
|
return []
|
|
|
|
# Sort by address
|
|
sorted_segs = sorted(segments, key=lambda s: s[0])
|
|
|
|
merged = []
|
|
cur_addr, cur_data = sorted_segs[0]
|
|
cur_data = bytearray(cur_data)
|
|
|
|
for addr, data in sorted_segs[1:]:
|
|
cur_end = cur_addr + len(cur_data)
|
|
if addr <= cur_end:
|
|
# Overlapping or adjacent -- extend or overwrite
|
|
overlap = cur_end - addr
|
|
if overlap >= 0:
|
|
cur_data.extend(data[overlap:] if overlap < len(data) else b'')
|
|
else:
|
|
# Gap -- pad with zeros (shouldn't happen after sort, but safe)
|
|
cur_data.extend(b'\x00' * (-overlap))
|
|
cur_data.extend(data)
|
|
else:
|
|
merged.append((cur_addr, bytes(cur_data)))
|
|
cur_addr = addr
|
|
cur_data = bytearray(data)
|
|
|
|
merged.append((cur_addr, bytes(cur_data)))
|
|
return merged
|
|
|
|
|
|
def load_firmware_file(path):
|
|
"""
|
|
Load firmware from .ihx/.hex (Intel HEX) or .bix/.bin (raw binary).
|
|
Returns list of (address, bytes) segments.
|
|
"""
|
|
ext = os.path.splitext(path)[1].lower()
|
|
|
|
with open(path, 'rb') as f:
|
|
raw = f.read()
|
|
|
|
if ext in ('.ihx', '.hex'):
|
|
segments = parse_ihx(raw)
|
|
segments = coalesce_segments(segments)
|
|
return segments
|
|
|
|
elif ext in ('.bix', '.bin'):
|
|
# Raw binary loads at address 0x0000
|
|
if not raw:
|
|
print(f"Empty file: {path}")
|
|
sys.exit(1)
|
|
return [(0x0000, raw)]
|
|
|
|
else:
|
|
# Try to auto-detect: if it starts with ':', assume Intel HEX
|
|
if raw.startswith(b':'):
|
|
segments = parse_ihx(raw)
|
|
segments = coalesce_segments(segments)
|
|
return segments
|
|
else:
|
|
# Treat as raw binary
|
|
return [(0x0000, raw)]
|
|
|
|
|
|
def write_segments(dev, segments, verbose=False):
|
|
"""
|
|
Write firmware segments to FX2 RAM in CHUNK_SIZE pieces.
|
|
Returns total bytes written.
|
|
"""
|
|
total = 0
|
|
|
|
for seg_addr, seg_data in segments:
|
|
seg_len = len(seg_data)
|
|
seg_end = seg_addr + seg_len - 1
|
|
print(f" 0x{seg_addr:04X}-0x{seg_end:04X} ({seg_len} bytes)")
|
|
|
|
offset = 0
|
|
while offset < seg_len:
|
|
chunk_len = min(CHUNK_SIZE, seg_len - offset)
|
|
chunk = seg_data[offset:offset + chunk_len]
|
|
addr = seg_addr + offset
|
|
|
|
try:
|
|
written = fx2_ram_write(dev, addr, chunk)
|
|
if written != chunk_len:
|
|
print(f"\n Short write at 0x{addr:04X}: "
|
|
f"sent {chunk_len}, wrote {written}")
|
|
except usb.core.USBError as e:
|
|
print(f"\n Write error at 0x{addr:04X}: {e}")
|
|
return total
|
|
|
|
if verbose and offset % 0x400 == 0:
|
|
pct = offset * 100 // seg_len
|
|
print(f"\r 0x{addr:04X} [{pct:3d}%]", end="", flush=True)
|
|
|
|
total += chunk_len
|
|
offset += chunk_len
|
|
|
|
if verbose and seg_len > CHUNK_SIZE:
|
|
print(f"\r 0x{seg_addr + seg_len - 1:04X} [100%] ")
|
|
|
|
return total
|
|
|
|
|
|
# -- Subcommand handlers --
|
|
|
|
def cmd_load(args):
|
|
"""Load firmware into FX2 RAM."""
|
|
if not os.path.exists(args.file):
|
|
print(f"File not found: {args.file}")
|
|
sys.exit(1)
|
|
|
|
# Parse firmware file
|
|
segments = load_firmware_file(args.file)
|
|
if not segments:
|
|
print("No code segments found in firmware file")
|
|
sys.exit(1)
|
|
|
|
total_bytes = sum(len(d) for _, d in segments)
|
|
min_addr = min(a for a, _ in segments)
|
|
max_addr = max(a + len(d) - 1 for a, d in segments)
|
|
|
|
print(f"SkyWalker-1 RAM Firmware Loader")
|
|
print(f"{'=' * 40}")
|
|
print(f"\nFirmware: {args.file}")
|
|
print(f" Segments: {len(segments)}")
|
|
print(f" Total size: {total_bytes} bytes")
|
|
print(f" Address: 0x{min_addr:04X} - 0x{max_addr:04X}")
|
|
|
|
# Check for CPUCS region overlap (warn but don't block)
|
|
for addr, data in segments:
|
|
seg_end = addr + len(data) - 1
|
|
if addr <= CPUCS_ADDR <= seg_end:
|
|
print(f"\n WARNING: Segment at 0x{addr:04X}-0x{seg_end:04X} "
|
|
f"overlaps CPUCS (0x{CPUCS_ADDR:04X})")
|
|
print(f" The CPU halt/start writes to 0xE600 will clobber "
|
|
f"this region")
|
|
|
|
print()
|
|
|
|
# Connect
|
|
dev = find_device(force=args.force)
|
|
|
|
# Check VID/PID if it's not a known device
|
|
vid = dev.idVendor
|
|
pid = dev.idProduct
|
|
is_skywalker = (vid == SKYWALKER_VID and pid == SKYWALKER_PID)
|
|
is_bare_fx2 = (vid == CYPRESS_VID and pid == CYPRESS_PID)
|
|
|
|
if not is_skywalker and not is_bare_fx2 and not args.force:
|
|
print(f"\n Unknown device VID 0x{vid:04X} PID 0x{pid:04X}")
|
|
print(f" Expected SkyWalker-1 (09C0:0203) or bare FX2 (04B4:8613)")
|
|
print(f" Use --force to override")
|
|
sys.exit(1)
|
|
|
|
intf = detach_driver(dev)
|
|
|
|
try:
|
|
# Step 1: Halt CPU
|
|
if not args.no_reset:
|
|
print("\n[1/3] Halting CPU (CPUCS = 0x01)...")
|
|
cpu_halt(dev)
|
|
time.sleep(0.05)
|
|
|
|
# Verify halt
|
|
readback = fx2_ram_read(dev, CPUCS_ADDR, 1)
|
|
if readback and readback[0] & 0x01:
|
|
print(" CPU halted")
|
|
else:
|
|
val = f"0x{readback[0]:02X}" if readback else "read failed"
|
|
print(f" WARNING: CPUCS readback = {val} (expected 0x01)")
|
|
print(" Proceeding anyway...")
|
|
else:
|
|
print("\n[1/3] Skipping CPU reset (--no-reset)")
|
|
|
|
# Step 2: Load segments
|
|
step = "2/3" if not args.no_reset else "2/2"
|
|
print(f"\n[{step}] Loading {len(segments)} segment(s) into RAM...")
|
|
written = write_segments(dev, segments, verbose=args.verbose)
|
|
print(f"\n {written} bytes loaded")
|
|
|
|
if written != total_bytes:
|
|
print(f" WARNING: expected {total_bytes}, wrote {written}")
|
|
|
|
# Step 3: Start CPU
|
|
if not args.no_reset:
|
|
print(f"\n[3/3] Starting CPU (CPUCS = 0x00)...")
|
|
cpu_start(dev)
|
|
print(" CPU released")
|
|
print(f"\n Firmware is running. The device will re-enumerate")
|
|
print(f" with new USB descriptors if the firmware does so.")
|
|
|
|
if args.wait:
|
|
_wait_for_reenumeration(args.wait)
|
|
else:
|
|
print(f"\n Segments loaded (CPU not reset)")
|
|
|
|
finally:
|
|
# Only re-attach if we didn't just start new firmware
|
|
# (the device may have already re-enumerated away)
|
|
if args.no_reset and intf is not None:
|
|
try:
|
|
usb.util.release_interface(dev, intf)
|
|
dev.attach_kernel_driver(intf)
|
|
print("\nRe-attached kernel driver")
|
|
except:
|
|
pass
|
|
|
|
|
|
def _wait_for_reenumeration(timeout):
|
|
"""Wait for a USB device to re-appear after firmware load."""
|
|
print(f"\n Waiting up to {timeout}s for re-enumeration...")
|
|
deadline = time.time() + timeout
|
|
time.sleep(1.0) # Give the device a moment to disconnect
|
|
|
|
while time.time() < deadline:
|
|
# Check for SkyWalker-1 with potentially new VID/PID
|
|
# After loading custom firmware, VID/PID may differ
|
|
dev = usb.core.find(idVendor=SKYWALKER_VID, idProduct=SKYWALKER_PID)
|
|
if dev is not None:
|
|
print(f" Device re-appeared: Bus {dev.bus} Addr {dev.address} "
|
|
f"(0x{SKYWALKER_VID:04X}:0x{SKYWALKER_PID:04X})")
|
|
return
|
|
|
|
dev = usb.core.find(idVendor=CYPRESS_VID, idProduct=CYPRESS_PID)
|
|
if dev is not None:
|
|
print(f" Device re-appeared: Bus {dev.bus} Addr {dev.address} "
|
|
f"(0x{CYPRESS_VID:04X}:0x{CYPRESS_PID:04X})")
|
|
return
|
|
|
|
print(".", end="", flush=True)
|
|
time.sleep(0.5)
|
|
|
|
print(f"\n Timeout -- device did not re-enumerate within {timeout}s")
|
|
print(f" The firmware may use different VID/PID. Check 'lsusb'.")
|
|
|
|
|
|
def cmd_reset(args):
|
|
"""Reset the FX2 CPU (halt then start)."""
|
|
print(f"SkyWalker-1 CPU Reset")
|
|
print(f"{'=' * 40}")
|
|
|
|
dev = find_device(force=args.force)
|
|
intf = detach_driver(dev)
|
|
|
|
try:
|
|
print("\nHalting CPU...")
|
|
cpu_halt(dev)
|
|
time.sleep(0.05)
|
|
print(" CPUCS = 0x01 (halted)")
|
|
|
|
time.sleep(0.1)
|
|
|
|
print("Starting CPU...")
|
|
cpu_start(dev)
|
|
print(" CPUCS = 0x00 (running)")
|
|
print("\nCPU reset complete. Device will re-enumerate.")
|
|
|
|
if args.wait:
|
|
_wait_for_reenumeration(args.wait)
|
|
finally:
|
|
pass # Device is likely gone after reset
|
|
|
|
|
|
def cmd_read(args):
|
|
"""Read and hex-dump FX2 RAM contents."""
|
|
addr = args.addr
|
|
length = args.length
|
|
|
|
print(f"SkyWalker-1 RAM Read")
|
|
print(f"{'=' * 40}")
|
|
|
|
dev = find_device(force=args.force)
|
|
intf = detach_driver(dev)
|
|
|
|
try:
|
|
print(f"\nReading {length} bytes from 0x{addr:04X}...\n")
|
|
|
|
data = bytearray()
|
|
offset = 0
|
|
errors = 0
|
|
|
|
while offset < length:
|
|
chunk_len = min(CHUNK_SIZE, length - offset)
|
|
chunk = fx2_ram_read(dev, addr + offset, chunk_len)
|
|
if chunk is None:
|
|
errors += 1
|
|
data.extend(b'\xff' * chunk_len)
|
|
else:
|
|
data.extend(chunk)
|
|
offset += chunk_len
|
|
|
|
# Hex dump output
|
|
for i in range(0, len(data), 16):
|
|
row = data[i:i + 16]
|
|
hex_part = ' '.join(f'{b:02X}' for b in row)
|
|
ascii_part = ''.join(chr(b) if 0x20 <= b < 0x7F else '.' for b in row)
|
|
print(f" {addr + i:04X}: {hex_part:<48s} {ascii_part}")
|
|
|
|
print(f"\n {len(data)} bytes read, {errors} chunk errors")
|
|
|
|
if args.output:
|
|
with open(args.output, 'wb') as f:
|
|
f.write(data)
|
|
print(f" Saved to: {args.output}")
|
|
|
|
finally:
|
|
if intf is not None:
|
|
try:
|
|
usb.util.release_interface(dev, intf)
|
|
dev.attach_kernel_driver(intf)
|
|
print("\nRe-attached kernel driver")
|
|
except:
|
|
print("\nNote: run 'sudo modprobe dvb_usb_gp8psk' to reload")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="SkyWalker-1 RAM firmware loader (FX2 vendor request 0xA0)",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""\
|
|
examples:
|
|
%(prog)s load firmware.ihx
|
|
%(prog)s load firmware.bix --wait 5
|
|
%(prog)s load firmware.ihx --no-reset
|
|
%(prog)s reset
|
|
%(prog)s read --addr 0x0000 --len 256
|
|
%(prog)s read --addr 0xe600 --len 1
|
|
|
|
This tool loads firmware into RAM only -- the EEPROM is never touched.
|
|
Power-cycle the device to restore the factory-programmed firmware.
|
|
""")
|
|
parser.add_argument('-v', '--verbose', action='store_true',
|
|
help="Show detailed transfer progress")
|
|
parser.add_argument('--force', action='store_true',
|
|
help="Allow loading to unknown VID/PID devices")
|
|
|
|
sub = parser.add_subparsers(dest='command')
|
|
|
|
# load (default)
|
|
p_load = sub.add_parser('load',
|
|
help='Load firmware into FX2 RAM')
|
|
p_load.add_argument('file', help='Firmware file (.ihx, .hex, .bix, .bin)')
|
|
p_load.add_argument('--no-reset', action='store_true',
|
|
help="Load without halting/starting the CPU")
|
|
p_load.add_argument('--wait', type=float, default=0, metavar='SECONDS',
|
|
help="Wait for USB re-enumeration after load")
|
|
p_load.add_argument('-v', '--verbose', action='store_true',
|
|
help="Show detailed transfer progress")
|
|
p_load.add_argument('--force', action='store_true',
|
|
help="Allow loading to unknown VID/PID devices")
|
|
|
|
# reset
|
|
p_reset = sub.add_parser('reset',
|
|
help='Reset the FX2 CPU (halt then start)')
|
|
p_reset.add_argument('--wait', type=float, default=0, metavar='SECONDS',
|
|
help="Wait for USB re-enumeration after reset")
|
|
p_reset.add_argument('--force', action='store_true',
|
|
help="Allow reset on unknown VID/PID devices")
|
|
|
|
# read
|
|
p_read = sub.add_parser('read',
|
|
help='Read and hex-dump FX2 RAM')
|
|
p_read.add_argument('--addr', type=lambda x: int(x, 0), default=0x0000,
|
|
help="Start address (default: 0x0000)")
|
|
p_read.add_argument('--len', dest='length', type=lambda x: int(x, 0),
|
|
default=256,
|
|
help="Number of bytes to read (default: 256)")
|
|
p_read.add_argument('-o', '--output', metavar='FILE',
|
|
help="Save raw bytes to file")
|
|
p_read.add_argument('--force', action='store_true',
|
|
help="Allow read on unknown VID/PID devices")
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Default to 'load' if a positional arg is given but no subcommand
|
|
if not args.command:
|
|
parser.print_help()
|
|
sys.exit(0)
|
|
|
|
# Propagate top-level flags to subcommands
|
|
if hasattr(args, 'verbose') and not args.verbose:
|
|
args.verbose = parser.parse_args().verbose
|
|
if hasattr(args, 'force') and not args.force:
|
|
args.force = parser.parse_args().force
|
|
|
|
dispatch = {
|
|
'load': cmd_load,
|
|
'reset': cmd_reset,
|
|
'read': cmd_read,
|
|
}
|
|
|
|
handler = dispatch.get(args.command)
|
|
if handler is None:
|
|
parser.print_help()
|
|
sys.exit(1)
|
|
|
|
handler(args)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|