One-off diagnostic scripts from experiments 0xD7-0xDB investigating the I2C BERR deadlock. Documents the systematic elimination of software-only recovery approaches: - i2c_host_test.py: Proved 0xA0 register writes cannot drive I2C bus - i2c_register_test.py: Tested I2C register writability from host - i2c_recovery_boot.py: Attempted I2C state machine recovery via boot - eeprom_flash_a0.py: Host-side EEPROM flash attempt (failed) - boot_ab_test.py / boot_test.py: EEPROM boot reliability testing - a8_autoclear_test.py: BCM4500 command register auto-clear behavior - addr_gateway_test.py: BCM3440 gateway address routing analysis - stock_fw_compare.py / stock_fw_test.py: Stock vs custom fw analysis
737 lines
24 KiB
Python
737 lines
24 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
EEPROM flash via host-side I2C orchestration (boot ROM 0xA0).
|
|
|
|
Writes C2 firmware images to the SkyWalker-1 EEPROM by driving the
|
|
FX2LP's I2C controller from the host while the CPU is halted.
|
|
|
|
Background:
|
|
- Stock firmware I2C proxy (0x83/0x84) returns pipe errors
|
|
- Custom firmware via RAM: I2CS BERR (0xF6) on CPUCS restart
|
|
- Pre-halt I2C flush (0x90) + halt → I2CS = 0x01 (clean/idle)
|
|
- Boot ROM 0xA0 can read/write I2C registers at 0xE678-0xE67A
|
|
- We drive the I2C controller from the host, one register write at
|
|
a time, to program the EEPROM
|
|
|
|
Strategy:
|
|
1. Pre-halt flush: GET_SIGNAL_LOCK (0x90) finishes stock firmware I2C
|
|
2. Halt CPU: CPUCS = 0x01 (I2C controller stays idle)
|
|
3. Write I2C registers via 0xA0 to orchestrate EEPROM write
|
|
4. Power-cycle to boot from new EEPROM image
|
|
"""
|
|
import usb.core, usb.util, sys, time, os, subprocess, argparse
|
|
|
|
# USB IDs
|
|
SKYWALKER_VID = 0x09C0
|
|
SKYWALKER_PID = 0x0203
|
|
|
|
# FX2LP register addresses (XDATA space)
|
|
CPUCS_ADDR = 0xE600
|
|
I2CS_ADDR = 0xE678
|
|
I2DAT_ADDR = 0xE679
|
|
I2CTL_ADDR = 0xE67A
|
|
|
|
# I2CS bit masks
|
|
bmSTART = 0x80
|
|
bmSTOP = 0x40
|
|
bmLASTRD = 0x20
|
|
bmID1 = 0x10
|
|
bmID0 = 0x08
|
|
bmBERR = 0x04
|
|
bmACK = 0x02
|
|
bmDONE = 0x01
|
|
|
|
# EEPROM parameters (24C128)
|
|
EEPROM_I2C_ADDR = 0x51 # 7-bit address
|
|
EEPROM_PAGE_SIZE = 64 # bytes per page write
|
|
EEPROM_SIZE = 16384 # 16KB total
|
|
EEPROM_WRITE_MS = 5 # max internal write cycle time
|
|
|
|
# Boot ROM vendor request
|
|
A0_REQUEST = 0xA0
|
|
|
|
|
|
def i2cs_str(val):
|
|
"""Human-readable I2CS register decode."""
|
|
flags = []
|
|
if val & bmSTART: flags.append('START')
|
|
if val & bmSTOP: flags.append('STOP')
|
|
if val & bmLASTRD: flags.append('LASTRD')
|
|
if val & bmBERR: flags.append('BERR')
|
|
if val & bmACK: flags.append('ACK')
|
|
if val & bmDONE: flags.append('DONE')
|
|
id_val = (val >> 3) & 0x03
|
|
state = {0: 'idle', 1: 'data', 2: 'addr-wait', 3: 'busy'}[id_val]
|
|
return f"0x{val:02X} [{' '.join(flags) if flags else 'clear'}] state={state}"
|
|
|
|
|
|
def find_device():
|
|
"""Find SkyWalker-1 on USB."""
|
|
dev = usb.core.find(idVendor=SKYWALKER_VID, idProduct=SKYWALKER_PID)
|
|
if dev is None:
|
|
print("ERROR: SkyWalker-1 not found on USB")
|
|
sys.exit(1)
|
|
print(f" Device: Bus {dev.bus} Addr {dev.address}")
|
|
return dev
|
|
|
|
|
|
def detach_driver(dev):
|
|
"""Detach kernel driver if attached."""
|
|
for cfg in dev:
|
|
for intf in cfg:
|
|
if dev.is_kernel_driver_active(intf.bInterfaceNumber):
|
|
try:
|
|
dev.detach_kernel_driver(intf.bInterfaceNumber)
|
|
except usb.core.USBError as e:
|
|
print(f" Cannot detach driver: {e}")
|
|
print(" Try: sudo modprobe -r dvb_usb_gp8psk")
|
|
sys.exit(1)
|
|
try:
|
|
dev.set_configuration()
|
|
except:
|
|
pass
|
|
|
|
|
|
# ── FX2 register access via 0xA0 ─────────────────────────────────
|
|
|
|
def a0_read(dev, addr, length=1):
|
|
"""Read from FX2 XDATA address space via boot ROM 0xA0."""
|
|
return bytes(dev.ctrl_transfer(
|
|
usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_IN,
|
|
A0_REQUEST, addr, 0, length, 2000))
|
|
|
|
|
|
def a0_write(dev, addr, data):
|
|
"""Write to FX2 XDATA address space via boot ROM 0xA0."""
|
|
if isinstance(data, int):
|
|
data = bytes([data])
|
|
dev.ctrl_transfer(
|
|
usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_OUT,
|
|
A0_REQUEST, addr, 0, data, 2000)
|
|
|
|
|
|
# ── Host-side I2C primitives ─────────────────────────────────────
|
|
|
|
def i2c_read_status(dev):
|
|
"""Read I2CS register."""
|
|
return a0_read(dev, I2CS_ADDR, 1)[0]
|
|
|
|
|
|
def i2c_wait_done(dev, timeout_ms=50):
|
|
"""Poll I2CS for DONE bit. Returns (success, i2cs_value)."""
|
|
deadline = time.time() + timeout_ms / 1000.0
|
|
while time.time() < deadline:
|
|
val = i2c_read_status(dev)
|
|
if val & bmDONE:
|
|
return True, val
|
|
if val & bmBERR:
|
|
return False, val
|
|
time.sleep(0.001) # 1ms between polls
|
|
return False, val
|
|
|
|
|
|
def i2c_start(dev):
|
|
"""Assert I2C START condition."""
|
|
a0_write(dev, I2CS_ADDR, bmSTART)
|
|
|
|
|
|
def i2c_stop(dev):
|
|
"""Assert I2C STOP condition and wait for completion."""
|
|
a0_write(dev, I2CS_ADDR, bmSTOP)
|
|
deadline = time.time() + 0.050
|
|
while time.time() < deadline:
|
|
val = i2c_read_status(dev)
|
|
if not (val & bmSTOP):
|
|
return True, val
|
|
time.sleep(0.001)
|
|
return False, val
|
|
|
|
|
|
def i2c_write_byte(dev, byte_val):
|
|
"""Write a byte to I2DAT, wait for DONE. Returns (ack, i2cs)."""
|
|
a0_write(dev, I2DAT_ADDR, byte_val)
|
|
ok, status = i2c_wait_done(dev)
|
|
if not ok:
|
|
return False, status
|
|
return bool(status & bmACK), status
|
|
|
|
|
|
def i2c_read_byte(dev, last=False):
|
|
"""Read a byte from I2DAT. Set last=True for NACK (last byte)."""
|
|
if last:
|
|
a0_write(dev, I2CS_ADDR, bmLASTRD)
|
|
# Reading I2DAT triggers the next SCL clock cycle
|
|
val = a0_read(dev, I2DAT_ADDR, 1)[0]
|
|
ok, status = i2c_wait_done(dev)
|
|
return val, ok, status
|
|
|
|
|
|
# ── Higher-level I2C operations ──────────────────────────────────
|
|
|
|
def i2c_probe(dev, addr_7bit):
|
|
"""Probe an I2C device. Returns True if it ACKs its address."""
|
|
i2c_start(dev)
|
|
ack, status = i2c_write_byte(dev, addr_7bit << 1)
|
|
i2c_stop(dev)
|
|
return ack
|
|
|
|
|
|
def eeprom_write_page(dev, mem_addr, data):
|
|
"""Write up to EEPROM_PAGE_SIZE bytes to EEPROM at mem_addr.
|
|
|
|
EEPROM protocol: START + slave_W + addr_H + addr_L + data... + STOP
|
|
Then wait for internal write cycle (ACK polling or fixed delay).
|
|
"""
|
|
if len(data) > EEPROM_PAGE_SIZE:
|
|
raise ValueError(f"Page write max {EEPROM_PAGE_SIZE} bytes, got {len(data)}")
|
|
|
|
addr_h = (mem_addr >> 8) & 0xFF
|
|
addr_l = mem_addr & 0xFF
|
|
|
|
# START
|
|
i2c_start(dev)
|
|
|
|
# Slave address (write)
|
|
ack, status = i2c_write_byte(dev, EEPROM_I2C_ADDR << 1)
|
|
if not ack:
|
|
i2c_stop(dev)
|
|
return False, "no ACK on slave address"
|
|
|
|
# Memory address high byte
|
|
ack, status = i2c_write_byte(dev, addr_h)
|
|
if not ack:
|
|
i2c_stop(dev)
|
|
return False, "no ACK on addr_H"
|
|
|
|
# Memory address low byte
|
|
ack, status = i2c_write_byte(dev, addr_l)
|
|
if not ack:
|
|
i2c_stop(dev)
|
|
return False, "no ACK on addr_L"
|
|
|
|
# Data bytes
|
|
for i, byte_val in enumerate(data):
|
|
ack, status = i2c_write_byte(dev, byte_val)
|
|
if not ack:
|
|
i2c_stop(dev)
|
|
return False, f"no ACK on data byte {i} (0x{byte_val:02X})"
|
|
|
|
# STOP (initiates EEPROM internal write cycle)
|
|
i2c_stop(dev)
|
|
|
|
# Wait for write cycle via ACK polling
|
|
# The EEPROM NACKs its address during the write cycle, then ACKs
|
|
# when the cycle completes. Timeout after 20ms.
|
|
deadline = time.time() + 0.020
|
|
while time.time() < deadline:
|
|
time.sleep(0.001)
|
|
i2c_start(dev)
|
|
ack, status = i2c_write_byte(dev, EEPROM_I2C_ADDR << 1)
|
|
if ack:
|
|
i2c_stop(dev)
|
|
return True, "ok"
|
|
i2c_stop(dev)
|
|
|
|
return False, "write cycle timeout (no ACK after 20ms)"
|
|
|
|
|
|
def eeprom_read_bytes(dev, mem_addr, length):
|
|
"""Read bytes from EEPROM.
|
|
|
|
Protocol: START + slave_W + addr_H + addr_L +
|
|
rSTART + slave_R + data[0] ... data[n-1] + NACK + STOP
|
|
"""
|
|
addr_h = (mem_addr >> 8) & 0xFF
|
|
addr_l = mem_addr & 0xFF
|
|
|
|
# Write phase: set address pointer
|
|
i2c_start(dev)
|
|
|
|
ack, status = i2c_write_byte(dev, EEPROM_I2C_ADDR << 1)
|
|
if not ack:
|
|
i2c_stop(dev)
|
|
return None, "no ACK on slave address (write phase)"
|
|
|
|
ack, status = i2c_write_byte(dev, addr_h)
|
|
if not ack:
|
|
i2c_stop(dev)
|
|
return None, "no ACK on addr_H"
|
|
|
|
ack, status = i2c_write_byte(dev, addr_l)
|
|
if not ack:
|
|
i2c_stop(dev)
|
|
return None, "no ACK on addr_L"
|
|
|
|
# Read phase: repeated START + slave_R
|
|
i2c_start(dev)
|
|
|
|
ack, status = i2c_write_byte(dev, (EEPROM_I2C_ADDR << 1) | 1)
|
|
if not ack:
|
|
i2c_stop(dev)
|
|
return None, "no ACK on slave address (read phase)"
|
|
|
|
# Read data bytes
|
|
result = bytearray()
|
|
for i in range(length):
|
|
is_last = (i == length - 1)
|
|
val, ok, status = i2c_read_byte(dev, last=is_last)
|
|
if not ok:
|
|
i2c_stop(dev)
|
|
return bytes(result), f"read failed at byte {i}"
|
|
result.append(val)
|
|
|
|
# Dummy read to complete the last byte cycle
|
|
_ = a0_read(dev, I2DAT_ADDR, 1)
|
|
|
|
# STOP
|
|
i2c_stop(dev)
|
|
|
|
return bytes(result), "ok"
|
|
|
|
|
|
# ── Device preparation ───────────────────────────────────────────
|
|
|
|
def prepare_device(dev, verbose=False):
|
|
"""Pre-halt flush, halt CPU, verify I2C controller is clean.
|
|
|
|
Returns True if I2C controller is ready for host-side operations.
|
|
"""
|
|
print("\n Phase 1: Prepare I2C controller")
|
|
print(" " + "─" * 40)
|
|
|
|
# Step 1: Pre-halt I2C flush
|
|
print(" Sending GET_SIGNAL_LOCK (0x90) to flush I2C...")
|
|
try:
|
|
lock = dev.ctrl_transfer(
|
|
usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_IN,
|
|
0x90, 0, 0, 1, 2000)
|
|
print(f" Response: 0x{lock[0]:02X}")
|
|
except usb.core.USBError as e:
|
|
print(f" 0x90 failed: {e}")
|
|
# Try alternative flush commands
|
|
try:
|
|
cfg = dev.ctrl_transfer(
|
|
usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_IN,
|
|
0x80, 0, 0, 1, 2000)
|
|
print(f" Fallback 0x80 response: 0x{cfg[0]:02X}")
|
|
except usb.core.USBError:
|
|
print(" No flush commands worked — proceeding anyway")
|
|
|
|
# Step 2: Halt CPU
|
|
print(" Halting CPU (CPUCS = 0x01)...")
|
|
a0_write(dev, CPUCS_ADDR, 0x01)
|
|
time.sleep(0.010)
|
|
|
|
cpucs = a0_read(dev, CPUCS_ADDR, 1)[0]
|
|
if not (cpucs & 0x01):
|
|
print(f" CPUCS readback: 0x{cpucs:02X} — halt may have failed")
|
|
return False
|
|
print(f" CPU halted (CPUCS = 0x{cpucs:02X})")
|
|
|
|
# Step 3: Read I2C controller state
|
|
i2cs = i2c_read_status(dev)
|
|
i2ctl = a0_read(dev, I2CTL_ADDR, 1)[0]
|
|
print(f" I2CS: {i2cs_str(i2cs)}")
|
|
print(f" I2CTL: 0x{i2ctl:02X} ({'400kHz' if i2ctl & 1 else '100kHz'})")
|
|
|
|
if i2cs & bmBERR:
|
|
print(" BERR is set — I2C controller is stuck")
|
|
print(" This usually means the CPU was restarted after halt")
|
|
return False
|
|
|
|
# Step 4: Set I2C speed to 400kHz
|
|
if not (i2ctl & 0x01):
|
|
print(" Setting I2CTL = 0x01 (400kHz)...")
|
|
a0_write(dev, I2CTL_ADDR, 0x01)
|
|
i2ctl2 = a0_read(dev, I2CTL_ADDR, 1)[0]
|
|
print(f" I2CTL readback: 0x{i2ctl2:02X}")
|
|
|
|
# Step 5: Quick I2C bus probe
|
|
print("\n Phase 2: I2C bus probe")
|
|
print(" " + "─" * 40)
|
|
|
|
addrs_to_probe = [
|
|
(0x51, "EEPROM (24C128)"),
|
|
(0x50, "EEPROM alt addr"),
|
|
(0x08, "BCM4500 demod"),
|
|
(0x10, "BCM3440 tuner"),
|
|
]
|
|
|
|
found_eeprom = False
|
|
for addr, name in addrs_to_probe:
|
|
ack = i2c_probe(dev, addr)
|
|
status_after = i2c_read_status(dev)
|
|
result = "ACK" if ack else "NACK"
|
|
marker = " <--" if ack and addr in (0x50, 0x51) else ""
|
|
print(f" 0x{addr:02X} {name:20s}: {result:4s} "
|
|
f"(I2CS={i2cs_str(status_after)}){marker}")
|
|
if ack and addr in (0x50, 0x51):
|
|
found_eeprom = True
|
|
|
|
# Check for BERR after each probe
|
|
if status_after & bmBERR:
|
|
print(f" BERR after probe — host-side I2C may not work")
|
|
return False
|
|
|
|
if verbose:
|
|
# Read a few bytes from EEPROM to verify reads work
|
|
if found_eeprom:
|
|
print("\n Phase 2b: EEPROM read test")
|
|
print(" " + "─" * 40)
|
|
data, msg = eeprom_read_bytes(dev, 0x0000, 8)
|
|
if data:
|
|
print(f" EEPROM[0x0000..0x0007]: {data.hex(' ')}")
|
|
if data[0] == 0xC2:
|
|
print(f" First byte is 0xC2 — valid C2 boot header!")
|
|
else:
|
|
print(f" First byte is 0x{data[0]:02X} — not a C2 header")
|
|
else:
|
|
print(f" Read failed: {msg}")
|
|
return False
|
|
|
|
if not found_eeprom:
|
|
print(" No EEPROM found at 0x50 or 0x51")
|
|
return False
|
|
|
|
print(f"\n I2C controller ready for EEPROM operations")
|
|
return True
|
|
|
|
|
|
# ── Subcommands ──────────────────────────────────────────────────
|
|
|
|
def cmd_probe(args):
|
|
"""Test host-side I2C by probing the bus."""
|
|
print("SkyWalker-1 Host-Side I2C Probe (0xA0)")
|
|
print("=" * 45)
|
|
|
|
dev = find_device()
|
|
detach_driver(dev)
|
|
|
|
ok = prepare_device(dev, verbose=True)
|
|
if not ok:
|
|
print("\n FAIL: Host-side I2C does not work")
|
|
print(" The 0xA0 vendor request may not trigger I2C hardware,")
|
|
print(" or the I2C bus is in a bad state.")
|
|
sys.exit(1)
|
|
else:
|
|
print("\n SUCCESS: Host-side I2C is working!")
|
|
print(" EEPROM detected and readable via boot ROM 0xA0")
|
|
|
|
|
|
def cmd_read(args):
|
|
"""Read EEPROM contents via host-side I2C."""
|
|
size = args.size
|
|
print("SkyWalker-1 EEPROM Read (host-side I2C)")
|
|
print("=" * 45)
|
|
|
|
dev = find_device()
|
|
detach_driver(dev)
|
|
|
|
ok = prepare_device(dev, verbose=False)
|
|
if not ok:
|
|
print("\n FAIL: Cannot prepare I2C")
|
|
sys.exit(1)
|
|
|
|
print(f"\n Reading {size} bytes from EEPROM...")
|
|
chunk_size = 32 # read in small chunks (each byte = 2+ USB transfers)
|
|
data = bytearray()
|
|
errors = 0
|
|
|
|
for offset in range(0, size, chunk_size):
|
|
remaining = min(chunk_size, size - offset)
|
|
chunk, msg = eeprom_read_bytes(dev, offset, remaining)
|
|
if chunk is None:
|
|
print(f"\n Read failed at 0x{offset:04X}: {msg}")
|
|
errors += 1
|
|
data.extend(b'\xff' * remaining)
|
|
else:
|
|
data.extend(chunk)
|
|
|
|
if offset % 256 == 0 or offset + remaining >= size:
|
|
pct = (offset + remaining) * 100 // size
|
|
print(f"\r Read: 0x{offset + remaining:04X}/{size:04X} [{pct:3d}%]",
|
|
end="", flush=True)
|
|
|
|
print()
|
|
|
|
if errors:
|
|
print(f"\n {errors} read error(s)")
|
|
|
|
# Hex dump
|
|
if args.hex_dump:
|
|
print(f"\n EEPROM contents (first {min(256, len(data))} bytes):")
|
|
for i in range(0, min(256, len(data)), 16):
|
|
row = data[i:i + 16]
|
|
hex_part = ' '.join(f'{b:02X}' for b in row)
|
|
ascii_part = ''.join(
|
|
chr(b) if 0x20 <= b < 0x7F else '.' for b in row)
|
|
print(f" {i:04X}: {hex_part:<48s} {ascii_part}")
|
|
|
|
# Check C2 header
|
|
if len(data) >= 8 and data[0] == 0xC2:
|
|
vid = data[2] << 8 | data[1]
|
|
pid = data[4] << 8 | data[3]
|
|
print(f"\n C2 header: VID=0x{vid:04X} PID=0x{pid:04X} "
|
|
f"CONFIG=0x{data[7]:02X}")
|
|
|
|
if args.output:
|
|
with open(args.output, 'wb') as f:
|
|
f.write(data)
|
|
print(f"\n Saved to: {args.output}")
|
|
|
|
|
|
def cmd_write(args):
|
|
"""Write a C2 firmware image to EEPROM via host-side I2C."""
|
|
image_path = args.file
|
|
if not os.path.exists(image_path):
|
|
print(f"File not found: {image_path}")
|
|
sys.exit(1)
|
|
|
|
with open(image_path, 'rb') as f:
|
|
image = f.read()
|
|
|
|
print("SkyWalker-1 EEPROM Flash (host-side I2C)")
|
|
print("=" * 45)
|
|
|
|
# Validate image
|
|
if len(image) < 8 or image[0] != 0xC2:
|
|
print(f" Not a C2 image (first byte: 0x{image[0]:02X})")
|
|
sys.exit(1)
|
|
if len(image) > EEPROM_SIZE:
|
|
print(f" Image too large: {len(image)} > {EEPROM_SIZE}")
|
|
sys.exit(1)
|
|
|
|
vid = image[2] << 8 | image[1]
|
|
pid = image[4] << 8 | image[3]
|
|
config = image[7]
|
|
print(f" Image: {image_path}")
|
|
print(f" Size: {len(image)} bytes ({len(image)*100//EEPROM_SIZE}% of EEPROM)")
|
|
print(f" VID: 0x{vid:04X} PID: 0x{pid:04X} CONFIG: 0x{config:02X}")
|
|
|
|
dev = find_device()
|
|
detach_driver(dev)
|
|
|
|
ok = prepare_device(dev, verbose=True)
|
|
if not ok:
|
|
print("\n FAIL: Cannot prepare I2C — aborting")
|
|
sys.exit(1)
|
|
|
|
# Backup current EEPROM first (just the header to be safe)
|
|
if not args.no_backup:
|
|
print(f"\n Backing up EEPROM header...")
|
|
hdr, msg = eeprom_read_bytes(dev, 0x0000, 8)
|
|
if hdr:
|
|
print(f" Current header: {hdr.hex(' ')}")
|
|
if hdr[0] == 0xC2:
|
|
cur_vid = hdr[2] << 8 | hdr[1]
|
|
cur_pid = hdr[4] << 8 | hdr[3]
|
|
print(f" Current: VID=0x{cur_vid:04X} PID=0x{cur_pid:04X}")
|
|
else:
|
|
print(f" Header read failed: {msg}")
|
|
if not args.force:
|
|
print(" Use --force to proceed without backup verification")
|
|
sys.exit(1)
|
|
|
|
# Dry run?
|
|
if args.dry_run:
|
|
pages = (len(image) + EEPROM_PAGE_SIZE - 1) // EEPROM_PAGE_SIZE
|
|
print(f"\n DRY RUN: would write {len(image)} bytes in {pages} pages")
|
|
return
|
|
|
|
# Write confirmation
|
|
print(f"\n *** WRITING {len(image)} BYTES TO EEPROM ***")
|
|
print(f" This replaces the boot firmware. A bad write = bricked device.")
|
|
print(f" Press Ctrl+C within 3 seconds to abort.")
|
|
try:
|
|
for i in range(3, 0, -1):
|
|
print(f"\r Starting in {i}... ", end="", flush=True)
|
|
time.sleep(1)
|
|
print("\r Writing now... ")
|
|
except KeyboardInterrupt:
|
|
print("\n Aborted.")
|
|
return
|
|
|
|
# Write image in page-sized chunks
|
|
total_pages = (len(image) + EEPROM_PAGE_SIZE - 1) // EEPROM_PAGE_SIZE
|
|
write_errors = 0
|
|
start_time = time.time()
|
|
|
|
for page_num in range(total_pages):
|
|
offset = page_num * EEPROM_PAGE_SIZE
|
|
end = min(offset + EEPROM_PAGE_SIZE, len(image))
|
|
chunk = image[offset:end]
|
|
|
|
pct = (page_num + 1) * 100 // total_pages
|
|
elapsed = time.time() - start_time
|
|
rate = (offset + len(chunk)) / elapsed if elapsed > 0 else 0
|
|
eta = (len(image) - offset - len(chunk)) / rate if rate > 0 else 0
|
|
print(f"\r Write: 0x{offset:04X}/0x{len(image):04X} "
|
|
f"[{pct:3d}%] {rate:.0f} B/s ETA {eta:.0f}s ",
|
|
end="", flush=True)
|
|
|
|
ok, msg = eeprom_write_page(dev, offset, chunk)
|
|
if not ok:
|
|
print(f"\n Write error at 0x{offset:04X}: {msg}")
|
|
write_errors += 1
|
|
if write_errors >= 3:
|
|
print("\n Too many errors — aborting")
|
|
print(" *** EEPROM STATE UNKNOWN ***")
|
|
sys.exit(1)
|
|
|
|
elapsed = time.time() - start_time
|
|
print(f"\r Write: 0x{len(image):04X}/0x{len(image):04X} "
|
|
f"[100%] done in {elapsed:.1f}s ")
|
|
|
|
if write_errors:
|
|
print(f"\n WARNING: {write_errors} write error(s)")
|
|
|
|
# Verify
|
|
print(f"\n Verifying ({len(image)} bytes)...")
|
|
verify_chunk_size = 32
|
|
mismatches = 0
|
|
first_mismatch = None
|
|
|
|
for offset in range(0, len(image), verify_chunk_size):
|
|
remaining = min(verify_chunk_size, len(image) - offset)
|
|
chunk, msg = eeprom_read_bytes(dev, offset, remaining)
|
|
if chunk is None:
|
|
print(f"\n Verify read failed at 0x{offset:04X}: {msg}")
|
|
mismatches += remaining
|
|
continue
|
|
|
|
for i, (expected, got) in enumerate(zip(image[offset:offset+remaining], chunk)):
|
|
if expected != got:
|
|
if first_mismatch is None:
|
|
first_mismatch = offset + i
|
|
mismatches += 1
|
|
if mismatches <= 8:
|
|
print(f"\n Mismatch at 0x{offset+i:04X}: "
|
|
f"wrote 0x{expected:02X} read 0x{got:02X}")
|
|
|
|
if offset % 256 == 0 or offset + remaining >= len(image):
|
|
pct = (offset + remaining) * 100 // len(image)
|
|
print(f"\r Verify: 0x{offset+remaining:04X}/0x{len(image):04X} "
|
|
f"[{pct:3d}%]", end="", flush=True)
|
|
|
|
print()
|
|
|
|
if mismatches == 0:
|
|
print(f"\n VERIFIED: all {len(image)} bytes match")
|
|
print(f" Flash complete in {elapsed:.1f}s")
|
|
print(f"\n Power-cycle the device to boot the new firmware.")
|
|
if args.power_cycle:
|
|
do_power_cycle()
|
|
else:
|
|
print(f"\n VERIFY FAILED: {mismatches} byte(s) differ "
|
|
f"(first at 0x{first_mismatch:04X})")
|
|
if mismatches > 8:
|
|
print(f" ({mismatches - 8} additional mismatches not shown)")
|
|
print(f"\n *** DO NOT POWER CYCLE ***")
|
|
print(f" Re-run this tool to retry, or use an external programmer.")
|
|
sys.exit(1)
|
|
|
|
|
|
def do_power_cycle():
|
|
"""Power-cycle SkyWalker-1 via uhubctl."""
|
|
print(f"\n Power-cycling via uhubctl...")
|
|
try:
|
|
# Off
|
|
r = subprocess.run(
|
|
['sudo', 'uhubctl', '-l', '1-5.4.4', '-p', '3', '-a', 'off'],
|
|
capture_output=True, text=True, timeout=10)
|
|
if r.returncode != 0:
|
|
print(f" uhubctl off failed: {r.stderr.strip()}")
|
|
print(f" Manually unplug and replug the device.")
|
|
return
|
|
print(f" Port 3 powered off")
|
|
time.sleep(2)
|
|
|
|
# On
|
|
r = subprocess.run(
|
|
['sudo', 'uhubctl', '-l', '1-5.4.4', '-p', '3', '-a', 'on'],
|
|
capture_output=True, text=True, timeout=10)
|
|
if r.returncode != 0:
|
|
print(f" uhubctl on failed: {r.stderr.strip()}")
|
|
return
|
|
print(f" Port 3 powered on")
|
|
print(f" Waiting for device to enumerate...")
|
|
|
|
time.sleep(3)
|
|
dev = usb.core.find(idVendor=SKYWALKER_VID, idProduct=SKYWALKER_PID)
|
|
if dev:
|
|
print(f" Device found: Bus {dev.bus} Addr {dev.address}")
|
|
print(f" New firmware is running!")
|
|
else:
|
|
print(f" Device not found yet — may need a few more seconds")
|
|
|
|
except FileNotFoundError:
|
|
print(f" uhubctl not found — manually power-cycle the device")
|
|
except subprocess.TimeoutExpired:
|
|
print(f" uhubctl timed out")
|
|
|
|
|
|
def cmd_power_cycle(args):
|
|
"""Power-cycle the SkyWalker-1 via uhubctl."""
|
|
print("SkyWalker-1 Power Cycle")
|
|
print("=" * 45)
|
|
do_power_cycle()
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="SkyWalker-1 EEPROM flash via host-side I2C (boot ROM 0xA0)",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""\
|
|
This tool drives the FX2LP's I2C controller from the host while the
|
|
CPU is halted. It works around the stock firmware's broken I2C proxy
|
|
and the BERR bug triggered by CPUCS restart.
|
|
|
|
examples:
|
|
%(prog)s probe # test if host-side I2C works
|
|
%(prog)s read -o backup.bin # backup current EEPROM
|
|
%(prog)s write firmware_eeprom.bin # flash new C2 image
|
|
%(prog)s write firmware_eeprom.bin -P # flash + auto power-cycle
|
|
%(prog)s power-cycle # just power-cycle the device
|
|
""")
|
|
sub = parser.add_subparsers(dest='command', required=True)
|
|
|
|
# probe
|
|
sub.add_parser('probe', help='Test host-side I2C bus access')
|
|
|
|
# read
|
|
p_read = sub.add_parser('read', help='Read EEPROM contents')
|
|
p_read.add_argument('-o', '--output', help='Save to file')
|
|
p_read.add_argument('--size', type=int, default=EEPROM_SIZE,
|
|
help=f'Bytes to read (default: {EEPROM_SIZE})')
|
|
p_read.add_argument('--hex', dest='hex_dump', action='store_true',
|
|
help='Show hex dump')
|
|
|
|
# write
|
|
p_write = sub.add_parser('write', help='Flash C2 image to EEPROM')
|
|
p_write.add_argument('file', help='C2 firmware image (.bin)')
|
|
p_write.add_argument('--dry-run', action='store_true',
|
|
help='Show what would happen without writing')
|
|
p_write.add_argument('--no-backup', action='store_true',
|
|
help='Skip header backup check')
|
|
p_write.add_argument('--force', action='store_true',
|
|
help='Continue despite warnings')
|
|
p_write.add_argument('-P', '--power-cycle', action='store_true',
|
|
help='Auto power-cycle after successful flash')
|
|
|
|
# power-cycle
|
|
sub.add_parser('power-cycle', help='Power-cycle via uhubctl')
|
|
|
|
args = parser.parse_args()
|
|
|
|
dispatch = {
|
|
'probe': cmd_probe,
|
|
'read': cmd_read,
|
|
'write': cmd_write,
|
|
'power-cycle': cmd_power_cycle,
|
|
}
|
|
dispatch[args.command](args)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|