Firmware: Rewrite skywalker1.c for EEPROM boot experiment — tests whether I2C hardware controller works after FX2 boot ROM completes EEPROM load (bypassing the CPUCS restart that triggers BERR). Tools: - fw_load.py: Add I2C cleanup stub, pre-halt register flush, improved error handling and segment loading - eeprom_write.py: Add IHX→C2 EEPROM image converter (16KB format with length-prefixed segments, checksum) - eeprom_dump.py: Refactor for cleaner output, better hex display - skywalker_lib.py: Minor I2C register constant updates Docs: - EEPROM-RECOVERY.md: Four recovery options for soft-bricked device (SOIC clip, SDA pull-up, desolder, wait-for-timeout) - Master reference: Updated with EEPROM boot findings Status: EEPROM flash blocked — stock firmware I2C proxy returns pipe errors, host-side 0xA0 writes proven unable to drive peripheral bus. Device boot ROM intermittently hangs on EEPROM I2C read (~3-6% success).
981 lines
34 KiB
Python
Executable File
981 lines
34 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Genpix SkyWalker-1 RAM firmware loader.
|
|
|
|
Loads firmware into the Cypress FX2 (CY7C68013A) internal/external RAM
|
|
via the standard 0xA0 vendor request. This does NOT touch the EEPROM --
|
|
power-cycling the device restores the factory-programmed firmware.
|
|
|
|
Use case: firmware development and testing. Load, test, power-cycle.
|
|
|
|
Loading sequence:
|
|
1. Halt CPU: write 0x01 to CPUCS register at 0xE600
|
|
2. Write code segments into RAM
|
|
3. Start CPU: write 0x00 to CPUCS at 0xE600
|
|
|
|
After starting, the FX2 runs the new firmware and typically
|
|
re-enumerates on USB with new VID/PID/descriptors.
|
|
|
|
Supports Intel HEX (.ihx/.hex) and raw binary (.bix/.bin) formats.
|
|
"""
|
|
|
|
import sys
|
|
import argparse
|
|
import time
|
|
import os
|
|
|
|
try:
|
|
import usb.core
|
|
import usb.util
|
|
except ImportError:
|
|
print("pyusb required: pip install pyusb")
|
|
sys.exit(1)
|
|
|
|
# Genpix SkyWalker-1
|
|
SKYWALKER_VID = 0x09C0
|
|
SKYWALKER_PID = 0x0203
|
|
|
|
# Bare/unprogrammed Cypress FX2 (no EEPROM or blank EEPROM)
|
|
CYPRESS_VID = 0x04B4
|
|
CYPRESS_PID = 0x8613
|
|
|
|
# FX2 vendor request for RAM access (built into silicon boot ROM)
|
|
FX2_RAM_REQUEST = 0xA0
|
|
|
|
# CPUCS register -- controls 8051 run/halt state
|
|
CPUCS_ADDR = 0xE600
|
|
|
|
# Max bytes per control transfer. The FX2 TRM says 64 bytes for
|
|
# the control endpoint buffer, so we stay conservative.
|
|
CHUNK_SIZE = 64
|
|
|
|
|
|
def find_device(force=False):
|
|
"""Find a SkyWalker-1 or bare FX2 device on USB."""
|
|
# Try SkyWalker-1 first
|
|
dev = usb.core.find(idVendor=SKYWALKER_VID, idProduct=SKYWALKER_PID)
|
|
if dev is not None:
|
|
print(f"Found SkyWalker-1: Bus {dev.bus} Addr {dev.address} "
|
|
f"(VID 0x{SKYWALKER_VID:04X} PID 0x{SKYWALKER_PID:04X})")
|
|
return dev
|
|
|
|
# Try bare Cypress FX2
|
|
dev = usb.core.find(idVendor=CYPRESS_VID, idProduct=CYPRESS_PID)
|
|
if dev is not None:
|
|
print(f"Found bare Cypress FX2: Bus {dev.bus} Addr {dev.address} "
|
|
f"(VID 0x{CYPRESS_VID:04X} PID 0x{CYPRESS_PID:04X})")
|
|
return dev
|
|
|
|
if force:
|
|
# Last resort: scan for any device the user might want
|
|
print("No SkyWalker-1 or bare FX2 found. --force is set but no "
|
|
"target device discovered.")
|
|
else:
|
|
print("No SkyWalker-1 (09C0:0203) or bare FX2 (04B4:8613) found.")
|
|
print("Is the device plugged in?")
|
|
sys.exit(1)
|
|
|
|
|
|
def detach_driver(dev):
|
|
"""Detach kernel driver if attached. Returns interface number or None."""
|
|
intf_num = None
|
|
for cfg in dev:
|
|
for intf in cfg:
|
|
if dev.is_kernel_driver_active(intf.bInterfaceNumber):
|
|
try:
|
|
dev.detach_kernel_driver(intf.bInterfaceNumber)
|
|
intf_num = intf.bInterfaceNumber
|
|
except usb.core.USBError as e:
|
|
print(f"Cannot detach driver: {e}")
|
|
print("Try: sudo modprobe -r dvb_usb_gp8psk")
|
|
sys.exit(1)
|
|
try:
|
|
dev.set_configuration()
|
|
except:
|
|
pass
|
|
return intf_num
|
|
|
|
|
|
def fx2_ram_write(dev, addr, data):
|
|
"""Write bytes to FX2 RAM at the given address via vendor request 0xA0."""
|
|
return dev.ctrl_transfer(
|
|
usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_OUT,
|
|
FX2_RAM_REQUEST, addr, 0, data, 2000)
|
|
|
|
|
|
def fx2_ram_read(dev, addr, length):
|
|
"""Read bytes from FX2 RAM at the given address via vendor request 0xA0."""
|
|
try:
|
|
data = dev.ctrl_transfer(
|
|
usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_IN,
|
|
FX2_RAM_REQUEST, addr, 0, length, 2000)
|
|
return bytes(data)
|
|
except usb.core.USBError:
|
|
return None
|
|
|
|
|
|
def cpu_halt(dev):
|
|
"""Halt the FX2 8051 CPU by writing 0x01 to CPUCS."""
|
|
fx2_ram_write(dev, CPUCS_ADDR, bytes([0x01]))
|
|
|
|
|
|
def cpu_start(dev):
|
|
"""Start the FX2 8051 CPU by writing 0x00 to CPUCS."""
|
|
fx2_ram_write(dev, CPUCS_ADDR, bytes([0x00]))
|
|
|
|
|
|
# -- Intel HEX parser --
|
|
|
|
def parse_ihx(data):
|
|
"""
|
|
Parse an Intel HEX file. Returns list of (address, bytes) segments.
|
|
|
|
Record types:
|
|
00 = data
|
|
01 = EOF
|
|
02 = extended segment address (shifts base by 16)
|
|
04 = extended linear address (shifts base by 16)
|
|
"""
|
|
segments = []
|
|
base_addr = 0
|
|
line_num = 0
|
|
|
|
for raw_line in data.splitlines():
|
|
line_num += 1
|
|
line = raw_line.strip()
|
|
if not line:
|
|
continue
|
|
if isinstance(line, bytes):
|
|
line = line.decode('ascii', errors='replace')
|
|
|
|
if not line.startswith(':'):
|
|
raise ValueError(f"Line {line_num}: missing start code ':'")
|
|
|
|
# Strip the colon and decode hex
|
|
hex_str = line[1:]
|
|
if len(hex_str) < 10:
|
|
raise ValueError(f"Line {line_num}: too short")
|
|
|
|
try:
|
|
raw = bytes.fromhex(hex_str)
|
|
except ValueError:
|
|
raise ValueError(f"Line {line_num}: invalid hex")
|
|
|
|
byte_count = raw[0]
|
|
addr = (raw[1] << 8) | raw[2]
|
|
rec_type = raw[3]
|
|
rec_data = raw[4:4 + byte_count]
|
|
checksum = raw[4 + byte_count]
|
|
|
|
# Verify checksum (two's complement of sum of all bytes before it)
|
|
calc_sum = sum(raw[:4 + byte_count]) & 0xFF
|
|
calc_check = (~calc_sum + 1) & 0xFF
|
|
if checksum != calc_check:
|
|
raise ValueError(
|
|
f"Line {line_num}: checksum mismatch "
|
|
f"(expected 0x{calc_check:02X}, got 0x{checksum:02X})")
|
|
|
|
if len(rec_data) != byte_count:
|
|
raise ValueError(
|
|
f"Line {line_num}: data length mismatch "
|
|
f"(header says {byte_count}, got {len(rec_data)})")
|
|
|
|
if rec_type == 0x00:
|
|
# Data record
|
|
full_addr = base_addr + addr
|
|
segments.append((full_addr, bytes(rec_data)))
|
|
|
|
elif rec_type == 0x01:
|
|
# EOF
|
|
break
|
|
|
|
elif rec_type == 0x02:
|
|
# Extended segment address
|
|
if byte_count != 2:
|
|
raise ValueError(
|
|
f"Line {line_num}: type 02 record must have 2 data bytes")
|
|
base_addr = ((rec_data[0] << 8) | rec_data[1]) << 4
|
|
|
|
elif rec_type == 0x04:
|
|
# Extended linear address
|
|
if byte_count != 2:
|
|
raise ValueError(
|
|
f"Line {line_num}: type 04 record must have 2 data bytes")
|
|
base_addr = ((rec_data[0] << 8) | rec_data[1]) << 16
|
|
|
|
# Silently ignore unknown record types (03, 05, etc.)
|
|
|
|
return segments
|
|
|
|
|
|
def coalesce_segments(segments):
|
|
"""
|
|
Merge adjacent/overlapping segments into contiguous blocks.
|
|
Returns list of (address, bytes) with no gaps.
|
|
"""
|
|
if not segments:
|
|
return []
|
|
|
|
# Sort by address
|
|
sorted_segs = sorted(segments, key=lambda s: s[0])
|
|
|
|
merged = []
|
|
cur_addr, cur_data = sorted_segs[0]
|
|
cur_data = bytearray(cur_data)
|
|
|
|
for addr, data in sorted_segs[1:]:
|
|
cur_end = cur_addr + len(cur_data)
|
|
if addr <= cur_end:
|
|
# Overlapping or adjacent -- extend or overwrite
|
|
overlap = cur_end - addr
|
|
if overlap >= 0:
|
|
cur_data.extend(data[overlap:] if overlap < len(data) else b'')
|
|
else:
|
|
# Gap -- pad with zeros (shouldn't happen after sort, but safe)
|
|
cur_data.extend(b'\x00' * (-overlap))
|
|
cur_data.extend(data)
|
|
else:
|
|
merged.append((cur_addr, bytes(cur_data)))
|
|
cur_addr = addr
|
|
cur_data = bytearray(data)
|
|
|
|
merged.append((cur_addr, bytes(cur_data)))
|
|
return merged
|
|
|
|
|
|
def load_firmware_file(path):
|
|
"""
|
|
Load firmware from .ihx/.hex (Intel HEX) or .bix/.bin (raw binary).
|
|
Returns list of (address, bytes) segments.
|
|
"""
|
|
ext = os.path.splitext(path)[1].lower()
|
|
|
|
with open(path, 'rb') as f:
|
|
raw = f.read()
|
|
|
|
if ext in ('.ihx', '.hex'):
|
|
segments = parse_ihx(raw)
|
|
segments = coalesce_segments(segments)
|
|
return segments
|
|
|
|
elif ext in ('.bix', '.bin'):
|
|
# Raw binary loads at address 0x0000
|
|
if not raw:
|
|
print(f"Empty file: {path}")
|
|
sys.exit(1)
|
|
return [(0x0000, raw)]
|
|
|
|
else:
|
|
# Try to auto-detect: if it starts with ':', assume Intel HEX
|
|
if raw.startswith(b':'):
|
|
segments = parse_ihx(raw)
|
|
segments = coalesce_segments(segments)
|
|
return segments
|
|
else:
|
|
# Treat as raw binary
|
|
return [(0x0000, raw)]
|
|
|
|
|
|
def write_segments(dev, segments, verbose=False):
|
|
"""
|
|
Write firmware segments to FX2 RAM in CHUNK_SIZE pieces.
|
|
Returns total bytes written.
|
|
"""
|
|
total = 0
|
|
|
|
for seg_addr, seg_data in segments:
|
|
seg_len = len(seg_data)
|
|
seg_end = seg_addr + seg_len - 1
|
|
print(f" 0x{seg_addr:04X}-0x{seg_end:04X} ({seg_len} bytes)")
|
|
|
|
offset = 0
|
|
while offset < seg_len:
|
|
chunk_len = min(CHUNK_SIZE, seg_len - offset)
|
|
chunk = seg_data[offset:offset + chunk_len]
|
|
addr = seg_addr + offset
|
|
|
|
try:
|
|
written = fx2_ram_write(dev, addr, chunk)
|
|
if written != chunk_len:
|
|
print(f"\n Short write at 0x{addr:04X}: "
|
|
f"sent {chunk_len}, wrote {written}")
|
|
except usb.core.USBError as e:
|
|
print(f"\n Write error at 0x{addr:04X}: {e}")
|
|
return total
|
|
|
|
if verbose and offset % 0x400 == 0:
|
|
pct = offset * 100 // seg_len
|
|
print(f"\r 0x{addr:04X} [{pct:3d}%]", end="", flush=True)
|
|
|
|
total += chunk_len
|
|
offset += chunk_len
|
|
|
|
if verbose and seg_len > CHUNK_SIZE:
|
|
print(f"\r 0x{seg_addr + seg_len - 1:04X} [100%] ")
|
|
|
|
return total
|
|
|
|
|
|
# -- I2C controller cleanup --
|
|
|
|
# FX2LP I2C controller XDATA registers (accessible via vendor request 0xA0)
|
|
I2CS_ADDR = 0xE678 # I2C Control/Status
|
|
I2DAT_ADDR = 0xE679 # I2C Data
|
|
I2CTL_ADDR = 0xE67A # I2C Control
|
|
|
|
# I2CS bit definitions
|
|
I2CS_START = 0x80
|
|
I2CS_STOP = 0x40
|
|
I2CS_LASTRD = 0x20
|
|
I2CS_ID1 = 0x10
|
|
I2CS_ID0 = 0x08
|
|
I2CS_BERR = 0x04
|
|
I2CS_ACK = 0x02
|
|
I2CS_DONE = 0x01
|
|
|
|
def i2cs_decode(val):
|
|
"""Decode I2CS register value into human-readable string."""
|
|
flags = []
|
|
for bit, name in [(7, 'START'), (6, 'STOP'), (5, 'LASTRD'),
|
|
(4, 'ID1'), (3, 'ID0'), (2, 'BERR'),
|
|
(1, 'ACK'), (0, 'DONE')]:
|
|
if val & (1 << bit):
|
|
flags.append(name)
|
|
id_val = (val >> 3) & 0x03
|
|
state = {0: 'idle', 1: 'data', 2: 'addr-wait', 3: 'busy'}[id_val]
|
|
return f"0x{val:02X} ({' | '.join(flags) if flags else 'idle'}) [state={state}]"
|
|
|
|
|
|
def build_i2c_cleanup_stub():
|
|
"""Build a tiny 8051 stub that terminates any stuck I2C transaction.
|
|
|
|
USB vendor request 0xA0 can READ I2C registers but WRITES are ignored
|
|
by the I2C controller during CPU halt (the peripheral only recognizes
|
|
8051-initiated XDATA writes). So we need the 8051 itself to do the
|
|
cleanup.
|
|
|
|
The stub:
|
|
1. Reads I2CS
|
|
2. If BERR: clears it
|
|
3. If mid-transaction (ID bits): reads I2DAT, sends STOP
|
|
4. If residual flags: sends STOP
|
|
5. Loops up to 10 times
|
|
6. Stores diagnostics at XDATA 0x3C00-0x3C07
|
|
7. Enters infinite loop (host halts CPU after reading diagnostics)
|
|
|
|
Diagnostics layout:
|
|
0x3C00: 0xAA = stub started
|
|
0x3C01: I2CS at entry (before any cleanup)
|
|
0x3C02: I2CS after cleanup (should be 0x00)
|
|
0x3C03: iteration count (how many loops needed)
|
|
0x3C04: I2DAT value read during cleanup
|
|
0x3C05: 0xDD = stub completed successfully
|
|
"""
|
|
# 8051 machine code — hand-assembled for clarity
|
|
code = []
|
|
|
|
# 0x0000: LJMP 0x0010 (skip interrupt vector area)
|
|
code += [0x02, 0x00, 0x10]
|
|
|
|
# Pad to 0x0010
|
|
while len(code) < 0x10:
|
|
code += [0x00]
|
|
|
|
# ========== 0x0010: Main code ==========
|
|
|
|
# --- Marker: stub started (0xAA → 0x3C00) ---
|
|
code += [0x74, 0xAA] # MOV A, #0xAA
|
|
code += [0x90, 0x3C, 0x00] # MOV DPTR, #0x3C00
|
|
code += [0xF0] # MOVX @DPTR, A
|
|
|
|
# --- Read initial I2CS → 0x3C01 ---
|
|
code += [0x90, 0xE6, 0x78] # MOV DPTR, #0xE678 (I2CS)
|
|
code += [0xE0] # MOVX A, @DPTR
|
|
code += [0x90, 0x3C, 0x01] # MOV DPTR, #0x3C01
|
|
code += [0xF0] # MOVX @DPTR, A
|
|
|
|
# --- Init loop counter R0=0 ---
|
|
code += [0x78, 0x00] # MOV R0, #0
|
|
|
|
# ========== CLEANUP LOOP ==========
|
|
loop_top = len(code)
|
|
|
|
# Increment loop counter
|
|
code += [0x08] # INC R0
|
|
|
|
# Read I2CS
|
|
code += [0x90, 0xE6, 0x78] # MOV DPTR, #0xE678 (I2CS)
|
|
code += [0xE0] # MOVX A, @DPTR (A = I2CS)
|
|
|
|
# If I2CS == 0x00 (idle), jump to done
|
|
code += [0x60] # JZ done (offset filled later)
|
|
jz_done_pc = len(code)
|
|
code += [0x00] # placeholder
|
|
|
|
# Check BERR (bit 2): if set, clear it
|
|
code += [0x30, 0xE2] # JNB ACC.2, skip_berr
|
|
jnb_berr_pc = len(code)
|
|
code += [0x00] # placeholder
|
|
# Clear BERR: write 0x04 to I2CS
|
|
code += [0x74, 0x04] # MOV A, #0x04
|
|
code += [0x90, 0xE6, 0x78] # MOV DPTR, #0xE678
|
|
code += [0xF0] # MOVX @DPTR, A
|
|
# Small delay
|
|
code += [0x79, 100] # MOV R1, #100
|
|
code += [0xD9, 0xFE] # DJNZ R1, $-2
|
|
# Jump to loop check
|
|
code += [0x80] # SJMP loop_check
|
|
sjmp_check1_pc = len(code)
|
|
code += [0x00] # placeholder
|
|
|
|
skip_berr = len(code)
|
|
|
|
# Check ID bits (bits 3,4): if either set, flush I2DAT + STOP
|
|
code += [0x54, 0x18] # ANL A, #0x18 (mask ID bits)
|
|
code += [0x60] # JZ skip_id
|
|
jz_skip_id_pc = len(code)
|
|
code += [0x00] # placeholder
|
|
|
|
# Read I2DAT (flush pending data)
|
|
code += [0x90, 0xE6, 0x79] # MOV DPTR, #0xE679 (I2DAT)
|
|
code += [0xE0] # MOVX A, @DPTR
|
|
code += [0x90, 0x3C, 0x04] # MOV DPTR, #0x3C04
|
|
code += [0xF0] # MOVX @DPTR, A (save I2DAT)
|
|
# Small delay
|
|
code += [0x79, 100] # MOV R1, #100
|
|
code += [0xD9, 0xFE] # DJNZ R1, $-2
|
|
# Send STOP: write 0x40 to I2CS
|
|
code += [0x74, 0x40] # MOV A, #0x40
|
|
code += [0x90, 0xE6, 0x78] # MOV DPTR, #0xE678
|
|
code += [0xF0] # MOVX @DPTR, A
|
|
# Longer delay for STOP to complete
|
|
code += [0x7A, 10] # MOV R2, #10
|
|
code += [0x79, 250] # MOV R1, #250
|
|
code += [0xD9, 0xFE] # DJNZ R1, $-2
|
|
code += [0xDA, 0xFC] # DJNZ R2, $-4
|
|
# Jump to loop check
|
|
code += [0x80] # SJMP loop_check
|
|
sjmp_check2_pc = len(code)
|
|
code += [0x00] # placeholder
|
|
|
|
skip_id = len(code)
|
|
|
|
# Residual flags — send STOP anyway
|
|
code += [0x74, 0x40] # MOV A, #0x40
|
|
code += [0x90, 0xE6, 0x78] # MOV DPTR, #0xE678
|
|
code += [0xF0] # MOVX @DPTR, A
|
|
code += [0x7A, 10] # MOV R2, #10
|
|
code += [0x79, 250] # MOV R1, #250
|
|
code += [0xD9, 0xFE] # DJNZ R1, $-2
|
|
code += [0xDA, 0xFC] # DJNZ R2, $-4
|
|
|
|
loop_check = len(code)
|
|
|
|
# Loop up to 10 times
|
|
code += [0xB8, 10] # CJNE R0, #10, loop_top
|
|
cjne_pc = len(code)
|
|
code += [(loop_top - (cjne_pc + 1)) & 0xFF]
|
|
|
|
# ========== DONE ==========
|
|
done = len(code)
|
|
|
|
# Store final I2CS → 0x3C02
|
|
code += [0x90, 0xE6, 0x78] # MOV DPTR, #0xE678
|
|
code += [0xE0] # MOVX A, @DPTR
|
|
code += [0x90, 0x3C, 0x02] # MOV DPTR, #0x3C02
|
|
code += [0xF0] # MOVX @DPTR, A
|
|
|
|
# Store iteration count → 0x3C03
|
|
code += [0xE8] # MOV A, R0
|
|
code += [0x90, 0x3C, 0x03] # MOV DPTR, #0x3C03
|
|
code += [0xF0] # MOVX @DPTR, A
|
|
|
|
# Marker: stub done (0xDD → 0x3C05)
|
|
code += [0x74, 0xDD] # MOV A, #0xDD
|
|
code += [0x90, 0x3C, 0x05] # MOV DPTR, #0x3C05
|
|
code += [0xF0] # MOVX @DPTR, A
|
|
|
|
# Infinite loop
|
|
code += [0x80, 0xFE] # SJMP $ (loop forever)
|
|
|
|
# ========== Patch jump offsets ==========
|
|
# All relative jump offsets: target - (offset_byte_position + 1)
|
|
# because 8051 PC points to the NEXT instruction when the branch executes.
|
|
code[jz_done_pc] = (done - (jz_done_pc + 1)) & 0xFF
|
|
code[jnb_berr_pc] = (skip_berr - (jnb_berr_pc + 1)) & 0xFF
|
|
code[sjmp_check1_pc] = (loop_check - (sjmp_check1_pc + 1)) & 0xFF
|
|
code[jz_skip_id_pc] = (skip_id - (jz_skip_id_pc + 1)) & 0xFF
|
|
code[sjmp_check2_pc] = (loop_check - (sjmp_check2_pc + 1)) & 0xFF
|
|
|
|
return bytes(code)
|
|
|
|
|
|
def i2c_cleanup(dev):
|
|
"""Attempt host-side I2C controller recovery after CPU halt.
|
|
|
|
After halting the stock firmware mid-I2C-transaction, I2CS reads 0x1A
|
|
(mid-transaction, no BERR). BERR (0xF6) only appears on CPU restart.
|
|
|
|
Strategy: write STOP to I2CS from the host via 0xA0 vendor request
|
|
BEFORE restarting the CPU. If the I2C controller accepts host writes,
|
|
the pending transaction ends cleanly and our firmware gets a working
|
|
I2C controller.
|
|
|
|
Even if the controller doesn't process STOP while halted, latching the
|
|
bit in the register may cause the hardware to execute it on CPU restart,
|
|
preventing BERR from being set.
|
|
"""
|
|
print(f"\n I2C controller recovery (host-side):")
|
|
|
|
# 1. Read initial state
|
|
i2cs = fx2_ram_read(dev, I2CS_ADDR, 1)
|
|
if not i2cs:
|
|
print(f" I2CS read failed — skipping recovery")
|
|
return
|
|
i2cs_val = i2cs[0]
|
|
print(f" I2CS initial: {i2cs_decode(i2cs_val)}")
|
|
|
|
if i2cs_val == 0x00:
|
|
print(f" I2C controller idle — no recovery needed")
|
|
return
|
|
|
|
# Also read I2CTL for diagnostics
|
|
i2ctl = fx2_ram_read(dev, I2CTL_ADDR, 1)
|
|
if i2ctl:
|
|
speed = "400kHz" if i2ctl[0] & 0x01 else "100kHz"
|
|
print(f" I2CTL: 0x{i2ctl[0]:02X} ({speed})")
|
|
|
|
berr_set = bool(i2cs_val & I2CS_BERR)
|
|
|
|
if berr_set:
|
|
print(f" BERR already set at halt time — unusual")
|
|
# Try clearing BERR (write bit 2)
|
|
print(f" Writing 0x04 to I2CS (BERR clear)...")
|
|
fx2_ram_write(dev, I2CS_ADDR, bytes([I2CS_BERR]))
|
|
time.sleep(0.010)
|
|
i2cs2 = fx2_ram_read(dev, I2CS_ADDR, 1)
|
|
if i2cs2:
|
|
print(f" I2CS after: {i2cs_decode(i2cs2[0])}")
|
|
return
|
|
|
|
# 2. Mid-transaction — attempt recovery
|
|
id_bits = (i2cs_val >> 3) & 0x03
|
|
print(f" Transaction state: ID={id_bits} ({'idle' if id_bits == 0 else 'active'})")
|
|
|
|
# Strategy A: Read I2DAT to flush pending byte, then STOP
|
|
print(f" [A] Flushing I2DAT + STOP...")
|
|
i2dat = fx2_ram_read(dev, I2DAT_ADDR, 1)
|
|
if i2dat:
|
|
print(f" I2DAT read: 0x{i2dat[0]:02X}")
|
|
time.sleep(0.005)
|
|
|
|
fx2_ram_write(dev, I2CS_ADDR, bytes([I2CS_STOP]))
|
|
time.sleep(0.010)
|
|
|
|
i2cs_a = fx2_ram_read(dev, I2CS_ADDR, 1)
|
|
if i2cs_a:
|
|
print(f" I2CS after: {i2cs_decode(i2cs_a[0])}")
|
|
if i2cs_a[0] == 0x00 or (i2cs_a[0] & I2CS_DONE):
|
|
print(f" ✓ Recovery may have worked!")
|
|
return
|
|
|
|
# Strategy B: LASTRD + STOP (end read transaction cleanly)
|
|
print(f" [B] LASTRD + STOP...")
|
|
fx2_ram_write(dev, I2CS_ADDR, bytes([I2CS_LASTRD | I2CS_STOP]))
|
|
time.sleep(0.010)
|
|
|
|
i2cs_b = fx2_ram_read(dev, I2CS_ADDR, 1)
|
|
if i2cs_b:
|
|
print(f" I2CS after: {i2cs_decode(i2cs_b[0])}")
|
|
if i2cs_b[0] == 0x00 or (i2cs_b[0] & I2CS_DONE):
|
|
print(f" ✓ Recovery may have worked!")
|
|
return
|
|
|
|
# Strategy C: Just STOP again (in case controller needed time)
|
|
print(f" [C] Retry STOP...")
|
|
fx2_ram_write(dev, I2CS_ADDR, bytes([I2CS_STOP]))
|
|
time.sleep(0.020)
|
|
|
|
i2cs_c = fx2_ram_read(dev, I2CS_ADDR, 1)
|
|
if i2cs_c:
|
|
print(f" I2CS after: {i2cs_decode(i2cs_c[0])}")
|
|
|
|
# If BERR appeared during recovery attempts, try to clear it
|
|
if i2cs_c and (i2cs_c[0] & I2CS_BERR):
|
|
print(f" [D] BERR appeared — attempting clear...")
|
|
fx2_ram_write(dev, I2CS_ADDR, bytes([I2CS_BERR]))
|
|
time.sleep(0.010)
|
|
i2cs_d = fx2_ram_read(dev, I2CS_ADDR, 1)
|
|
if i2cs_d:
|
|
print(f" I2CS after: {i2cs_decode(i2cs_d[0])}")
|
|
|
|
# Final state
|
|
time.sleep(0.010)
|
|
i2cs_final = fx2_ram_read(dev, I2CS_ADDR, 1)
|
|
if i2cs_final:
|
|
print(f" I2CS final: {i2cs_decode(i2cs_final[0])}")
|
|
if i2cs_final[0] == 0x00:
|
|
print(f" ✓ I2C controller recovered!")
|
|
elif not (i2cs_final[0] & I2CS_BERR):
|
|
print(f" ~ I2C controller not idle but no BERR — STOP may be latched")
|
|
else:
|
|
print(f" ✗ BERR persists — host-side recovery did not work")
|
|
|
|
|
|
# -- Subcommand handlers --
|
|
|
|
def cmd_load(args):
|
|
"""Load firmware into FX2 RAM."""
|
|
if not os.path.exists(args.file):
|
|
print(f"File not found: {args.file}")
|
|
sys.exit(1)
|
|
|
|
# Parse firmware file
|
|
segments = load_firmware_file(args.file)
|
|
if not segments:
|
|
print("No code segments found in firmware file")
|
|
sys.exit(1)
|
|
|
|
total_bytes = sum(len(d) for _, d in segments)
|
|
min_addr = min(a for a, _ in segments)
|
|
max_addr = max(a + len(d) - 1 for a, d in segments)
|
|
|
|
print(f"SkyWalker-1 RAM Firmware Loader")
|
|
print(f"{'=' * 40}")
|
|
print(f"\nFirmware: {args.file}")
|
|
print(f" Segments: {len(segments)}")
|
|
print(f" Total size: {total_bytes} bytes")
|
|
print(f" Address: 0x{min_addr:04X} - 0x{max_addr:04X}")
|
|
|
|
# Check for CPUCS region overlap (warn but don't block)
|
|
for addr, data in segments:
|
|
seg_end = addr + len(data) - 1
|
|
if addr <= CPUCS_ADDR <= seg_end:
|
|
print(f"\n WARNING: Segment at 0x{addr:04X}-0x{seg_end:04X} "
|
|
f"overlaps CPUCS (0x{CPUCS_ADDR:04X})")
|
|
print(f" The CPU halt/start writes to 0xE600 will clobber "
|
|
f"this region")
|
|
|
|
print()
|
|
|
|
# Connect
|
|
dev = find_device(force=args.force)
|
|
|
|
# Check VID/PID if it's not a known device
|
|
vid = dev.idVendor
|
|
pid = dev.idProduct
|
|
is_skywalker = (vid == SKYWALKER_VID and pid == SKYWALKER_PID)
|
|
is_bare_fx2 = (vid == CYPRESS_VID and pid == CYPRESS_PID)
|
|
|
|
if not is_skywalker and not is_bare_fx2 and not args.force:
|
|
print(f"\n Unknown device VID 0x{vid:04X} PID 0x{pid:04X}")
|
|
print(f" Expected SkyWalker-1 (09C0:0203) or bare FX2 (04B4:8613)")
|
|
print(f" Use --force to override")
|
|
sys.exit(1)
|
|
|
|
intf = detach_driver(dev)
|
|
|
|
try:
|
|
# Step 1: Halt CPU
|
|
if not args.no_reset:
|
|
if args.settle_delay > 0:
|
|
print(f"\n Settle delay: waiting {args.settle_delay}s for stock "
|
|
f"firmware I2C to finish...")
|
|
time.sleep(args.settle_delay)
|
|
print(f" Settle complete.")
|
|
|
|
# Pre-halt I2C flush: send vendor requests to the stock firmware
|
|
# that trigger I2C operations. After the firmware completes the
|
|
# I2C transaction (including STOP), the controller should be idle.
|
|
# We then immediately halt before any new I2C operation starts.
|
|
#
|
|
# Try multiple approaches — the stock firmware may support
|
|
# different subsets of the gp8psk vendor request protocol.
|
|
print("\n Pre-halt I2C flush...")
|
|
i2c_flushed = False
|
|
# Approach 1: GET_SIGNAL_LOCK (0x90) — reads BCM4500 via I2C
|
|
try:
|
|
lock_data = dev.ctrl_transfer(
|
|
usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_IN,
|
|
0x90, 0, 0, 1, 2000)
|
|
print(f" 0x90 GET_SIGNAL_LOCK: 0x{lock_data[0]:02X} (I2C to BCM4500)")
|
|
i2c_flushed = True
|
|
except usb.core.USBError:
|
|
print(f" 0x90 not supported")
|
|
# Approach 2: I2C_READ (0x84) with shifted address
|
|
if not i2c_flushed:
|
|
try:
|
|
eeprom = dev.ctrl_transfer(
|
|
usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_IN,
|
|
0x84, 0xA2, 0, 1, 2000) # addr<<1 per gp8psk
|
|
print(f" 0x84 I2C_READ: 0x{eeprom[0]:02X} (EEPROM)")
|
|
i2c_flushed = True
|
|
except usb.core.USBError:
|
|
print(f" 0x84 not supported")
|
|
# Approach 3: GET_8PSK_CONFIG (0x80) — may trigger I2C indirectly
|
|
if not i2c_flushed:
|
|
try:
|
|
cfg = dev.ctrl_transfer(
|
|
usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_IN,
|
|
0x80, 0, 0, 1, 2000)
|
|
print(f" 0x80 GET_8PSK_CONFIG: 0x{cfg[0]:02X}")
|
|
i2c_flushed = True
|
|
except usb.core.USBError:
|
|
print(f" 0x80 not supported")
|
|
if not i2c_flushed:
|
|
print(f" No stock vendor requests succeeded — halt may catch mid-I2C")
|
|
|
|
print("\n[1/3] Halting CPU (CPUCS = 0x01)...")
|
|
cpu_halt(dev)
|
|
time.sleep(0.01) # minimal delay — I2C should be idle from flush
|
|
|
|
# Verify halt
|
|
readback = fx2_ram_read(dev, CPUCS_ADDR, 1)
|
|
if readback and readback[0] & 0x01:
|
|
print(" CPU halted")
|
|
else:
|
|
val = f"0x{readback[0]:02X}" if readback else "read failed"
|
|
print(f" WARNING: CPUCS readback = {val} (expected 0x01)")
|
|
print(" Proceeding anyway...")
|
|
else:
|
|
print("\n[1/3] Skipping CPU reset (--no-reset)")
|
|
|
|
# Step 1.5: I2C controller cleanup (two-stage boot)
|
|
# The stock firmware's I2C polling is almost certainly interrupted
|
|
# by our CPUCS halt. The I2C controller runs independently — it
|
|
# enters a stuck state that causes BERR (I2CS=0xF6) on CPU restart.
|
|
# Must run BEFORE firmware load since the cleanup stub uses 0x0000.
|
|
if not args.no_reset:
|
|
i2c_cleanup(dev)
|
|
|
|
# Step 2: Load segments (CPU is halted from cleanup or step 1)
|
|
step = "2/3" if not args.no_reset else "2/2"
|
|
print(f"\n[{step}] Loading {len(segments)} segment(s) into RAM...")
|
|
written = write_segments(dev, segments, verbose=args.verbose)
|
|
print(f"\n {written} bytes loaded")
|
|
|
|
if written != total_bytes:
|
|
print(f" WARNING: expected {total_bytes}, wrote {written}")
|
|
|
|
# Step 3: Start CPU
|
|
if not args.no_reset:
|
|
print(f"\n[3/3] Starting CPU (CPUCS = 0x00)...")
|
|
cpu_start(dev)
|
|
print(" CPU released")
|
|
print(f"\n Firmware is running. The device will re-enumerate")
|
|
print(f" with new USB descriptors if the firmware does so.")
|
|
|
|
if args.wait:
|
|
_wait_for_reenumeration(args.wait)
|
|
else:
|
|
print(f"\n Segments loaded (CPU not reset)")
|
|
|
|
finally:
|
|
# Only re-attach if we didn't just start new firmware
|
|
# (the device may have already re-enumerated away)
|
|
if args.no_reset and intf is not None:
|
|
try:
|
|
usb.util.release_interface(dev, intf)
|
|
dev.attach_kernel_driver(intf)
|
|
print("\nRe-attached kernel driver")
|
|
except:
|
|
pass
|
|
|
|
|
|
def _wait_for_reenumeration(timeout):
|
|
"""Wait for a USB device to re-appear after firmware load."""
|
|
print(f"\n Waiting up to {timeout}s for re-enumeration...")
|
|
deadline = time.time() + timeout
|
|
time.sleep(1.0) # Give the device a moment to disconnect
|
|
|
|
while time.time() < deadline:
|
|
# Check for SkyWalker-1 with potentially new VID/PID
|
|
# After loading custom firmware, VID/PID may differ
|
|
dev = usb.core.find(idVendor=SKYWALKER_VID, idProduct=SKYWALKER_PID)
|
|
if dev is not None:
|
|
print(f" Device re-appeared: Bus {dev.bus} Addr {dev.address} "
|
|
f"(0x{SKYWALKER_VID:04X}:0x{SKYWALKER_PID:04X})")
|
|
return
|
|
|
|
dev = usb.core.find(idVendor=CYPRESS_VID, idProduct=CYPRESS_PID)
|
|
if dev is not None:
|
|
print(f" Device re-appeared: Bus {dev.bus} Addr {dev.address} "
|
|
f"(0x{CYPRESS_VID:04X}:0x{CYPRESS_PID:04X})")
|
|
return
|
|
|
|
print(".", end="", flush=True)
|
|
time.sleep(0.5)
|
|
|
|
print(f"\n Timeout -- device did not re-enumerate within {timeout}s")
|
|
print(f" The firmware may use different VID/PID. Check 'lsusb'.")
|
|
|
|
|
|
def cmd_reset(args):
|
|
"""Reset the FX2 CPU (halt then start)."""
|
|
print(f"SkyWalker-1 CPU Reset")
|
|
print(f"{'=' * 40}")
|
|
|
|
dev = find_device(force=args.force)
|
|
intf = detach_driver(dev)
|
|
|
|
try:
|
|
print("\nHalting CPU...")
|
|
cpu_halt(dev)
|
|
time.sleep(0.05)
|
|
print(" CPUCS = 0x01 (halted)")
|
|
|
|
time.sleep(0.1)
|
|
|
|
print("Starting CPU...")
|
|
cpu_start(dev)
|
|
print(" CPUCS = 0x00 (running)")
|
|
print("\nCPU reset complete. Device will re-enumerate.")
|
|
|
|
if args.wait:
|
|
_wait_for_reenumeration(args.wait)
|
|
finally:
|
|
pass # Device is likely gone after reset
|
|
|
|
|
|
def cmd_read(args):
|
|
"""Read and hex-dump FX2 RAM contents."""
|
|
addr = args.addr
|
|
length = args.length
|
|
|
|
print(f"SkyWalker-1 RAM Read")
|
|
print(f"{'=' * 40}")
|
|
|
|
dev = find_device(force=args.force)
|
|
intf = detach_driver(dev)
|
|
|
|
try:
|
|
print(f"\nReading {length} bytes from 0x{addr:04X}...\n")
|
|
|
|
data = bytearray()
|
|
offset = 0
|
|
errors = 0
|
|
|
|
while offset < length:
|
|
chunk_len = min(CHUNK_SIZE, length - offset)
|
|
chunk = fx2_ram_read(dev, addr + offset, chunk_len)
|
|
if chunk is None:
|
|
errors += 1
|
|
data.extend(b'\xff' * chunk_len)
|
|
else:
|
|
data.extend(chunk)
|
|
offset += chunk_len
|
|
|
|
# Hex dump output
|
|
for i in range(0, len(data), 16):
|
|
row = data[i:i + 16]
|
|
hex_part = ' '.join(f'{b:02X}' for b in row)
|
|
ascii_part = ''.join(chr(b) if 0x20 <= b < 0x7F else '.' for b in row)
|
|
print(f" {addr + i:04X}: {hex_part:<48s} {ascii_part}")
|
|
|
|
print(f"\n {len(data)} bytes read, {errors} chunk errors")
|
|
|
|
if args.output:
|
|
with open(args.output, 'wb') as f:
|
|
f.write(data)
|
|
print(f" Saved to: {args.output}")
|
|
|
|
finally:
|
|
if intf is not None:
|
|
try:
|
|
usb.util.release_interface(dev, intf)
|
|
dev.attach_kernel_driver(intf)
|
|
print("\nRe-attached kernel driver")
|
|
except:
|
|
print("\nNote: run 'sudo modprobe dvb_usb_gp8psk' to reload")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="SkyWalker-1 RAM firmware loader (FX2 vendor request 0xA0)",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""\
|
|
examples:
|
|
%(prog)s load firmware.ihx
|
|
%(prog)s load firmware.bix --wait 5
|
|
%(prog)s load firmware.ihx --no-reset
|
|
%(prog)s reset
|
|
%(prog)s read --addr 0x0000 --len 256
|
|
%(prog)s read --addr 0xe600 --len 1
|
|
|
|
This tool loads firmware into RAM only -- the EEPROM is never touched.
|
|
Power-cycle the device to restore the factory-programmed firmware.
|
|
""")
|
|
parser.add_argument('-v', '--verbose', action='store_true',
|
|
help="Show detailed transfer progress")
|
|
parser.add_argument('--force', action='store_true',
|
|
help="Allow loading to unknown VID/PID devices")
|
|
|
|
sub = parser.add_subparsers(dest='command')
|
|
|
|
# load (default)
|
|
p_load = sub.add_parser('load',
|
|
help='Load firmware into FX2 RAM')
|
|
p_load.add_argument('file', help='Firmware file (.ihx, .hex, .bix, .bin)')
|
|
p_load.add_argument('--no-reset', action='store_true',
|
|
help="Load without halting/starting the CPU")
|
|
p_load.add_argument('--wait', type=float, default=0, metavar='SECONDS',
|
|
help="Wait for USB re-enumeration after load")
|
|
p_load.add_argument('-v', '--verbose', action='store_true',
|
|
help="Show detailed transfer progress")
|
|
p_load.add_argument('--force', action='store_true',
|
|
help="Allow loading to unknown VID/PID devices")
|
|
p_load.add_argument('--settle-delay', type=float, default=0, metavar='SECONDS',
|
|
help="Wait N seconds before halting CPU (lets stock firmware "
|
|
"finish I2C init — may avoid I2C BERR on restart)")
|
|
|
|
# reset
|
|
p_reset = sub.add_parser('reset',
|
|
help='Reset the FX2 CPU (halt then start)')
|
|
p_reset.add_argument('--wait', type=float, default=0, metavar='SECONDS',
|
|
help="Wait for USB re-enumeration after reset")
|
|
p_reset.add_argument('--force', action='store_true',
|
|
help="Allow reset on unknown VID/PID devices")
|
|
|
|
# read
|
|
p_read = sub.add_parser('read',
|
|
help='Read and hex-dump FX2 RAM')
|
|
p_read.add_argument('--addr', type=lambda x: int(x, 0), default=0x0000,
|
|
help="Start address (default: 0x0000)")
|
|
p_read.add_argument('--len', dest='length', type=lambda x: int(x, 0),
|
|
default=256,
|
|
help="Number of bytes to read (default: 256)")
|
|
p_read.add_argument('-o', '--output', metavar='FILE',
|
|
help="Save raw bytes to file")
|
|
p_read.add_argument('--force', action='store_true',
|
|
help="Allow read on unknown VID/PID devices")
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Default to 'load' if a positional arg is given but no subcommand
|
|
if not args.command:
|
|
parser.print_help()
|
|
sys.exit(0)
|
|
|
|
# Propagate top-level flags to subcommands
|
|
if hasattr(args, 'verbose') and not args.verbose:
|
|
args.verbose = parser.parse_args().verbose
|
|
if hasattr(args, 'force') and not args.force:
|
|
args.force = parser.parse_args().force
|
|
|
|
dispatch = {
|
|
'load': cmd_load,
|
|
'reset': cmd_reset,
|
|
'read': cmd_read,
|
|
}
|
|
|
|
handler = dispatch.get(args.command)
|
|
if handler is None:
|
|
parser.print_help()
|
|
sys.exit(1)
|
|
|
|
handler(args)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|