skywalker-1/tools/fw_load.py
Ryan Malloy 5710584267 Add custom FX2 firmware and RAM loader for open-source development
Custom firmware (SDCC + fx2lib) implements all stock vendor commands
(0x80-0x94) plus new commands for spectrum sweep (0xB0), raw BCM4500
register access (0xB1/0xB2), and blind scan (0xB3). Compiles to 6.3KB
of code with healthy RAM margins.

RAM loader (fw_load.py) uses the FX2 0xA0 vendor request to load
firmware into RAM without touching EEPROM -- power cycle restores
factory firmware. Supports Intel HEX and raw binary formats.
2026-02-11 19:46:50 -07:00

617 lines
20 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Genpix SkyWalker-1 RAM firmware loader.
Loads firmware into the Cypress FX2 (CY7C68013A) internal/external RAM
via the standard 0xA0 vendor request. This does NOT touch the EEPROM --
power-cycling the device restores the factory-programmed firmware.
Use case: firmware development and testing. Load, test, power-cycle.
Loading sequence:
1. Halt CPU: write 0x01 to CPUCS register at 0xE600
2. Write code segments into RAM
3. Start CPU: write 0x00 to CPUCS at 0xE600
After starting, the FX2 runs the new firmware and typically
re-enumerates on USB with new VID/PID/descriptors.
Supports Intel HEX (.ihx/.hex) and raw binary (.bix/.bin) formats.
"""
import sys
import argparse
import time
import os
try:
import usb.core
import usb.util
except ImportError:
print("pyusb required: pip install pyusb")
sys.exit(1)
# Genpix SkyWalker-1
SKYWALKER_VID = 0x09C0
SKYWALKER_PID = 0x0203
# Bare/unprogrammed Cypress FX2 (no EEPROM or blank EEPROM)
CYPRESS_VID = 0x04B4
CYPRESS_PID = 0x8613
# FX2 vendor request for RAM access (built into silicon boot ROM)
FX2_RAM_REQUEST = 0xA0
# CPUCS register -- controls 8051 run/halt state
CPUCS_ADDR = 0xE600
# Max bytes per control transfer. The FX2 TRM says 64 bytes for
# the control endpoint buffer, so we stay conservative.
CHUNK_SIZE = 64
def find_device(force=False):
"""Find a SkyWalker-1 or bare FX2 device on USB."""
# Try SkyWalker-1 first
dev = usb.core.find(idVendor=SKYWALKER_VID, idProduct=SKYWALKER_PID)
if dev is not None:
print(f"Found SkyWalker-1: Bus {dev.bus} Addr {dev.address} "
f"(VID 0x{SKYWALKER_VID:04X} PID 0x{SKYWALKER_PID:04X})")
return dev
# Try bare Cypress FX2
dev = usb.core.find(idVendor=CYPRESS_VID, idProduct=CYPRESS_PID)
if dev is not None:
print(f"Found bare Cypress FX2: Bus {dev.bus} Addr {dev.address} "
f"(VID 0x{CYPRESS_VID:04X} PID 0x{CYPRESS_PID:04X})")
return dev
if force:
# Last resort: scan for any device the user might want
print("No SkyWalker-1 or bare FX2 found. --force is set but no "
"target device discovered.")
else:
print("No SkyWalker-1 (09C0:0203) or bare FX2 (04B4:8613) found.")
print("Is the device plugged in?")
sys.exit(1)
def detach_driver(dev):
"""Detach kernel driver if attached. Returns interface number or None."""
intf_num = None
for cfg in dev:
for intf in cfg:
if dev.is_kernel_driver_active(intf.bInterfaceNumber):
try:
dev.detach_kernel_driver(intf.bInterfaceNumber)
intf_num = intf.bInterfaceNumber
except usb.core.USBError as e:
print(f"Cannot detach driver: {e}")
print("Try: sudo modprobe -r dvb_usb_gp8psk")
sys.exit(1)
try:
dev.set_configuration()
except:
pass
return intf_num
def fx2_ram_write(dev, addr, data):
"""Write bytes to FX2 RAM at the given address via vendor request 0xA0."""
return dev.ctrl_transfer(
usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_OUT,
FX2_RAM_REQUEST, addr, 0, data, 2000)
def fx2_ram_read(dev, addr, length):
"""Read bytes from FX2 RAM at the given address via vendor request 0xA0."""
try:
data = dev.ctrl_transfer(
usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_IN,
FX2_RAM_REQUEST, addr, 0, length, 2000)
return bytes(data)
except usb.core.USBError:
return None
def cpu_halt(dev):
"""Halt the FX2 8051 CPU by writing 0x01 to CPUCS."""
fx2_ram_write(dev, CPUCS_ADDR, bytes([0x01]))
def cpu_start(dev):
"""Start the FX2 8051 CPU by writing 0x00 to CPUCS."""
fx2_ram_write(dev, CPUCS_ADDR, bytes([0x00]))
# -- Intel HEX parser --
def parse_ihx(data):
"""
Parse an Intel HEX file. Returns list of (address, bytes) segments.
Record types:
00 = data
01 = EOF
02 = extended segment address (shifts base by 16)
04 = extended linear address (shifts base by 16)
"""
segments = []
base_addr = 0
line_num = 0
for raw_line in data.splitlines():
line_num += 1
line = raw_line.strip()
if not line:
continue
if isinstance(line, bytes):
line = line.decode('ascii', errors='replace')
if not line.startswith(':'):
raise ValueError(f"Line {line_num}: missing start code ':'")
# Strip the colon and decode hex
hex_str = line[1:]
if len(hex_str) < 10:
raise ValueError(f"Line {line_num}: too short")
try:
raw = bytes.fromhex(hex_str)
except ValueError:
raise ValueError(f"Line {line_num}: invalid hex")
byte_count = raw[0]
addr = (raw[1] << 8) | raw[2]
rec_type = raw[3]
rec_data = raw[4:4 + byte_count]
checksum = raw[4 + byte_count]
# Verify checksum (two's complement of sum of all bytes before it)
calc_sum = sum(raw[:4 + byte_count]) & 0xFF
calc_check = (~calc_sum + 1) & 0xFF
if checksum != calc_check:
raise ValueError(
f"Line {line_num}: checksum mismatch "
f"(expected 0x{calc_check:02X}, got 0x{checksum:02X})")
if len(rec_data) != byte_count:
raise ValueError(
f"Line {line_num}: data length mismatch "
f"(header says {byte_count}, got {len(rec_data)})")
if rec_type == 0x00:
# Data record
full_addr = base_addr + addr
segments.append((full_addr, bytes(rec_data)))
elif rec_type == 0x01:
# EOF
break
elif rec_type == 0x02:
# Extended segment address
if byte_count != 2:
raise ValueError(
f"Line {line_num}: type 02 record must have 2 data bytes")
base_addr = ((rec_data[0] << 8) | rec_data[1]) << 4
elif rec_type == 0x04:
# Extended linear address
if byte_count != 2:
raise ValueError(
f"Line {line_num}: type 04 record must have 2 data bytes")
base_addr = ((rec_data[0] << 8) | rec_data[1]) << 16
# Silently ignore unknown record types (03, 05, etc.)
return segments
def coalesce_segments(segments):
"""
Merge adjacent/overlapping segments into contiguous blocks.
Returns list of (address, bytes) with no gaps.
"""
if not segments:
return []
# Sort by address
sorted_segs = sorted(segments, key=lambda s: s[0])
merged = []
cur_addr, cur_data = sorted_segs[0]
cur_data = bytearray(cur_data)
for addr, data in sorted_segs[1:]:
cur_end = cur_addr + len(cur_data)
if addr <= cur_end:
# Overlapping or adjacent -- extend or overwrite
overlap = cur_end - addr
if overlap >= 0:
cur_data.extend(data[overlap:] if overlap < len(data) else b'')
else:
# Gap -- pad with zeros (shouldn't happen after sort, but safe)
cur_data.extend(b'\x00' * (-overlap))
cur_data.extend(data)
else:
merged.append((cur_addr, bytes(cur_data)))
cur_addr = addr
cur_data = bytearray(data)
merged.append((cur_addr, bytes(cur_data)))
return merged
def load_firmware_file(path):
"""
Load firmware from .ihx/.hex (Intel HEX) or .bix/.bin (raw binary).
Returns list of (address, bytes) segments.
"""
ext = os.path.splitext(path)[1].lower()
with open(path, 'rb') as f:
raw = f.read()
if ext in ('.ihx', '.hex'):
segments = parse_ihx(raw)
segments = coalesce_segments(segments)
return segments
elif ext in ('.bix', '.bin'):
# Raw binary loads at address 0x0000
if not raw:
print(f"Empty file: {path}")
sys.exit(1)
return [(0x0000, raw)]
else:
# Try to auto-detect: if it starts with ':', assume Intel HEX
if raw.startswith(b':'):
segments = parse_ihx(raw)
segments = coalesce_segments(segments)
return segments
else:
# Treat as raw binary
return [(0x0000, raw)]
def write_segments(dev, segments, verbose=False):
"""
Write firmware segments to FX2 RAM in CHUNK_SIZE pieces.
Returns total bytes written.
"""
total = 0
for seg_addr, seg_data in segments:
seg_len = len(seg_data)
seg_end = seg_addr + seg_len - 1
print(f" 0x{seg_addr:04X}-0x{seg_end:04X} ({seg_len} bytes)")
offset = 0
while offset < seg_len:
chunk_len = min(CHUNK_SIZE, seg_len - offset)
chunk = seg_data[offset:offset + chunk_len]
addr = seg_addr + offset
try:
written = fx2_ram_write(dev, addr, chunk)
if written != chunk_len:
print(f"\n Short write at 0x{addr:04X}: "
f"sent {chunk_len}, wrote {written}")
except usb.core.USBError as e:
print(f"\n Write error at 0x{addr:04X}: {e}")
return total
if verbose and offset % 0x400 == 0:
pct = offset * 100 // seg_len
print(f"\r 0x{addr:04X} [{pct:3d}%]", end="", flush=True)
total += chunk_len
offset += chunk_len
if verbose and seg_len > CHUNK_SIZE:
print(f"\r 0x{seg_addr + seg_len - 1:04X} [100%] ")
return total
# -- Subcommand handlers --
def cmd_load(args):
"""Load firmware into FX2 RAM."""
if not os.path.exists(args.file):
print(f"File not found: {args.file}")
sys.exit(1)
# Parse firmware file
segments = load_firmware_file(args.file)
if not segments:
print("No code segments found in firmware file")
sys.exit(1)
total_bytes = sum(len(d) for _, d in segments)
min_addr = min(a for a, _ in segments)
max_addr = max(a + len(d) - 1 for a, d in segments)
print(f"SkyWalker-1 RAM Firmware Loader")
print(f"{'=' * 40}")
print(f"\nFirmware: {args.file}")
print(f" Segments: {len(segments)}")
print(f" Total size: {total_bytes} bytes")
print(f" Address: 0x{min_addr:04X} - 0x{max_addr:04X}")
# Check for CPUCS region overlap (warn but don't block)
for addr, data in segments:
seg_end = addr + len(data) - 1
if addr <= CPUCS_ADDR <= seg_end:
print(f"\n WARNING: Segment at 0x{addr:04X}-0x{seg_end:04X} "
f"overlaps CPUCS (0x{CPUCS_ADDR:04X})")
print(f" The CPU halt/start writes to 0xE600 will clobber "
f"this region")
print()
# Connect
dev = find_device(force=args.force)
# Check VID/PID if it's not a known device
vid = dev.idVendor
pid = dev.idProduct
is_skywalker = (vid == SKYWALKER_VID and pid == SKYWALKER_PID)
is_bare_fx2 = (vid == CYPRESS_VID and pid == CYPRESS_PID)
if not is_skywalker and not is_bare_fx2 and not args.force:
print(f"\n Unknown device VID 0x{vid:04X} PID 0x{pid:04X}")
print(f" Expected SkyWalker-1 (09C0:0203) or bare FX2 (04B4:8613)")
print(f" Use --force to override")
sys.exit(1)
intf = detach_driver(dev)
try:
# Step 1: Halt CPU
if not args.no_reset:
print("\n[1/3] Halting CPU (CPUCS = 0x01)...")
cpu_halt(dev)
time.sleep(0.05)
# Verify halt
readback = fx2_ram_read(dev, CPUCS_ADDR, 1)
if readback and readback[0] & 0x01:
print(" CPU halted")
else:
val = f"0x{readback[0]:02X}" if readback else "read failed"
print(f" WARNING: CPUCS readback = {val} (expected 0x01)")
print(" Proceeding anyway...")
else:
print("\n[1/3] Skipping CPU reset (--no-reset)")
# Step 2: Load segments
step = "2/3" if not args.no_reset else "2/2"
print(f"\n[{step}] Loading {len(segments)} segment(s) into RAM...")
written = write_segments(dev, segments, verbose=args.verbose)
print(f"\n {written} bytes loaded")
if written != total_bytes:
print(f" WARNING: expected {total_bytes}, wrote {written}")
# Step 3: Start CPU
if not args.no_reset:
print(f"\n[3/3] Starting CPU (CPUCS = 0x00)...")
cpu_start(dev)
print(" CPU released")
print(f"\n Firmware is running. The device will re-enumerate")
print(f" with new USB descriptors if the firmware does so.")
if args.wait:
_wait_for_reenumeration(args.wait)
else:
print(f"\n Segments loaded (CPU not reset)")
finally:
# Only re-attach if we didn't just start new firmware
# (the device may have already re-enumerated away)
if args.no_reset and intf is not None:
try:
usb.util.release_interface(dev, intf)
dev.attach_kernel_driver(intf)
print("\nRe-attached kernel driver")
except:
pass
def _wait_for_reenumeration(timeout):
"""Wait for a USB device to re-appear after firmware load."""
print(f"\n Waiting up to {timeout}s for re-enumeration...")
deadline = time.time() + timeout
time.sleep(1.0) # Give the device a moment to disconnect
while time.time() < deadline:
# Check for SkyWalker-1 with potentially new VID/PID
# After loading custom firmware, VID/PID may differ
dev = usb.core.find(idVendor=SKYWALKER_VID, idProduct=SKYWALKER_PID)
if dev is not None:
print(f" Device re-appeared: Bus {dev.bus} Addr {dev.address} "
f"(0x{SKYWALKER_VID:04X}:0x{SKYWALKER_PID:04X})")
return
dev = usb.core.find(idVendor=CYPRESS_VID, idProduct=CYPRESS_PID)
if dev is not None:
print(f" Device re-appeared: Bus {dev.bus} Addr {dev.address} "
f"(0x{CYPRESS_VID:04X}:0x{CYPRESS_PID:04X})")
return
print(".", end="", flush=True)
time.sleep(0.5)
print(f"\n Timeout -- device did not re-enumerate within {timeout}s")
print(f" The firmware may use different VID/PID. Check 'lsusb'.")
def cmd_reset(args):
"""Reset the FX2 CPU (halt then start)."""
print(f"SkyWalker-1 CPU Reset")
print(f"{'=' * 40}")
dev = find_device(force=args.force)
intf = detach_driver(dev)
try:
print("\nHalting CPU...")
cpu_halt(dev)
time.sleep(0.05)
print(" CPUCS = 0x01 (halted)")
time.sleep(0.1)
print("Starting CPU...")
cpu_start(dev)
print(" CPUCS = 0x00 (running)")
print("\nCPU reset complete. Device will re-enumerate.")
if args.wait:
_wait_for_reenumeration(args.wait)
finally:
pass # Device is likely gone after reset
def cmd_read(args):
"""Read and hex-dump FX2 RAM contents."""
addr = args.addr
length = args.length
print(f"SkyWalker-1 RAM Read")
print(f"{'=' * 40}")
dev = find_device(force=args.force)
intf = detach_driver(dev)
try:
print(f"\nReading {length} bytes from 0x{addr:04X}...\n")
data = bytearray()
offset = 0
errors = 0
while offset < length:
chunk_len = min(CHUNK_SIZE, length - offset)
chunk = fx2_ram_read(dev, addr + offset, chunk_len)
if chunk is None:
errors += 1
data.extend(b'\xff' * chunk_len)
else:
data.extend(chunk)
offset += chunk_len
# Hex dump output
for i in range(0, len(data), 16):
row = data[i:i + 16]
hex_part = ' '.join(f'{b:02X}' for b in row)
ascii_part = ''.join(chr(b) if 0x20 <= b < 0x7F else '.' for b in row)
print(f" {addr + i:04X}: {hex_part:<48s} {ascii_part}")
print(f"\n {len(data)} bytes read, {errors} chunk errors")
if args.output:
with open(args.output, 'wb') as f:
f.write(data)
print(f" Saved to: {args.output}")
finally:
if intf is not None:
try:
usb.util.release_interface(dev, intf)
dev.attach_kernel_driver(intf)
print("\nRe-attached kernel driver")
except:
print("\nNote: run 'sudo modprobe dvb_usb_gp8psk' to reload")
def main():
parser = argparse.ArgumentParser(
description="SkyWalker-1 RAM firmware loader (FX2 vendor request 0xA0)",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""\
examples:
%(prog)s load firmware.ihx
%(prog)s load firmware.bix --wait 5
%(prog)s load firmware.ihx --no-reset
%(prog)s reset
%(prog)s read --addr 0x0000 --len 256
%(prog)s read --addr 0xe600 --len 1
This tool loads firmware into RAM only -- the EEPROM is never touched.
Power-cycle the device to restore the factory-programmed firmware.
""")
parser.add_argument('-v', '--verbose', action='store_true',
help="Show detailed transfer progress")
parser.add_argument('--force', action='store_true',
help="Allow loading to unknown VID/PID devices")
sub = parser.add_subparsers(dest='command')
# load (default)
p_load = sub.add_parser('load',
help='Load firmware into FX2 RAM')
p_load.add_argument('file', help='Firmware file (.ihx, .hex, .bix, .bin)')
p_load.add_argument('--no-reset', action='store_true',
help="Load without halting/starting the CPU")
p_load.add_argument('--wait', type=float, default=0, metavar='SECONDS',
help="Wait for USB re-enumeration after load")
p_load.add_argument('-v', '--verbose', action='store_true',
help="Show detailed transfer progress")
p_load.add_argument('--force', action='store_true',
help="Allow loading to unknown VID/PID devices")
# reset
p_reset = sub.add_parser('reset',
help='Reset the FX2 CPU (halt then start)')
p_reset.add_argument('--wait', type=float, default=0, metavar='SECONDS',
help="Wait for USB re-enumeration after reset")
p_reset.add_argument('--force', action='store_true',
help="Allow reset on unknown VID/PID devices")
# read
p_read = sub.add_parser('read',
help='Read and hex-dump FX2 RAM')
p_read.add_argument('--addr', type=lambda x: int(x, 0), default=0x0000,
help="Start address (default: 0x0000)")
p_read.add_argument('--len', dest='length', type=lambda x: int(x, 0),
default=256,
help="Number of bytes to read (default: 256)")
p_read.add_argument('-o', '--output', metavar='FILE',
help="Save raw bytes to file")
p_read.add_argument('--force', action='store_true',
help="Allow read on unknown VID/PID devices")
args = parser.parse_args()
# Default to 'load' if a positional arg is given but no subcommand
if not args.command:
parser.print_help()
sys.exit(0)
# Propagate top-level flags to subcommands
if hasattr(args, 'verbose') and not args.verbose:
args.verbose = parser.parse_args().verbose
if hasattr(args, 'force') and not args.force:
args.force = parser.parse_args().force
dispatch = {
'load': cmd_load,
'reset': cmd_reset,
'read': cmd_read,
}
handler = dispatch.get(args.command)
if handler is None:
parser.print_help()
sys.exit(1)
handler(args)
if __name__ == '__main__':
main()