#!/usr/bin/env python3 """ Hamilton Adversarial Test Suite — SkyWalker-1 v3.05.0 "What happens if the astronaut pushes the wrong button?" Tests operator error, invalid inputs, state machine violations, boundary conditions, and rapid-fire stress to verify all safety fixes from the Phase E Margaret Hamilton review. """ import sys import os import time import struct sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from skywalker_lib import SkyWalker1 import usb.core ERR_NAMES = { 0x00: 'OK', 0x01: 'I2C_TIMEOUT', 0x02: 'I2C_NAK', 0x03: 'BCM_TIMEOUT', 0x04: 'BCM_NOT_READY', 0x05: 'BCM_VERIFY', 0x06: 'TUNE_FAIL', 0x07: 'EP0_TIMEOUT', 0x08: 'GPIF_TIMEOUT', 0x09: 'EP2_TIMEOUT', 0x0A: 'NOT_SUPPORTED', 0x0B: 'DISEQC_LEN', 0x0C: 'DISEQC_TIMER', 0x0D: 'WDT_FIRED' } passed = 0 failed = 0 def get_err(sw): return sw.dev.ctrl_transfer(0xC0, 0xBC, 0, 0, 1)[0] def err_name(code): return ERR_NAMES.get(code, f'0x{code:02X}') def device_alive(sw): try: fw = sw.get_fw_version() return fw['version'] == '3.05.0' except Exception: return False def test(sw, label, fn, expect_err=None, expect_no_hang=False): """Run test, track error changes, verify device survives.""" global passed, failed err_before = get_err(sw) usb_err = None try: fn() except usb.core.USBError as e: usb_err = e except Exception as e: usb_err = e time.sleep(0.15) if not device_alive(sw): print(f' [FAIL] {label}: DEVICE DIED!') failed += 1 return False err_after = get_err(sw) changed = (err_after != err_before) suffix = f' (USB: {usb_err})' if usb_err else '' if expect_err is not None: if err_after == expect_err: print(f' [PASS] {label}: err={err_name(expect_err)} as expected{suffix}') passed += 1 else: print(f' [FAIL] {label}: expected {err_name(expect_err)}, got {err_name(err_after)}{suffix}') failed += 1 elif expect_no_hang: print(f' [PASS] {label}: no hang, err={err_name(err_after)}{suffix}') passed += 1 else: if changed: print(f' [INFO] {label}: err changed {err_name(err_before)} -> {err_name(err_after)}{suffix}') else: print(f' [PASS] {label}: no new error{suffix}') passed += 1 return True def main(): global passed, failed with SkyWalker1() as sw: print('=' * 64) print(' HAMILTON ADVERSARIAL TEST SUITE — SkyWalker-1 v3.05.0') print(' "What if the astronaut pushes the wrong button?"') print('=' * 64) print() # Ensure clean starting state sw.dev.ctrl_transfer(0xC0, 0x89, 1, 0, 3, timeout=10000) sw.start_intersil(True) time.sleep(0.5) # ============================================================ print('=== CAT 1: DiSEqC Message Abuse ===') print() test(sw, '1a. Tone burst B (M3: NOT_SUPPORTED)', lambda: sw.send_diseqc_tone_burst(1), expect_err=0x0A) test(sw, '1b. Tone burst wValue=0xFF', lambda: sw.dev.ctrl_transfer(0x40, 0x8D, 0xFF, 0, None, timeout=3000), expect_err=0x0A) test(sw, '1c. DiSEqC 2 bytes (too short)', lambda: sw.dev.ctrl_transfer(0x40, 0x8D, 0xE0, 0, bytes([0xE0, 0x10]), timeout=3000), expect_no_hang=True) test(sw, '1d. DiSEqC 8 bytes (too long)', lambda: sw.dev.ctrl_transfer(0x40, 0x8D, 0xE0, 0, bytes([0xE0] * 8), timeout=3000), expect_no_hang=True) test(sw, '1e. DiSEqC empty payload', lambda: sw.dev.ctrl_transfer(0x40, 0x8D, 0xE0, 0, bytes([]), timeout=3000), expect_no_hang=True) test(sw, '1f. Valid 4-byte DiSEqC (recovery)', lambda: sw.send_diseqc_message(bytes([0xE0, 0x10, 0x38, 0xF0])), expect_no_hang=True) test(sw, '1g. DiSEqC 1.2 motor halt (no motor)', lambda: sw.send_diseqc_message(bytes([0xE0, 0x31, 0x60])), expect_no_hang=True) test(sw, '1h. DiSEqC 1.2 drive east 255 steps', lambda: sw.send_diseqc_message(bytes([0xE0, 0x31, 0x68, 0xFF])), expect_no_hang=True) test(sw, '1i. DiSEqC 1.2 USALS GotoX (bogus angle)', lambda: sw.send_diseqc_message(bytes([0xE0, 0x31, 0x6E, 0xFF, 0xFF])), expect_no_hang=True) print() # ============================================================ print('=== CAT 2: Tune Parameter Abuse ===') print() test(sw, '2a. SR=0', lambda: [sw.tune(0, 1000000, 0, 0), time.sleep(0.5)], expect_no_hang=True) test(sw, '2b. SR=0xFFFFFFFF', lambda: [sw.dev.ctrl_transfer(0x40, 0x86, 0, 0, struct.pack('> Powering off BCM4500...') sw.dev.ctrl_transfer(0xC0, 0x89, 0, 0, 3, timeout=5000) time.sleep(0.5) test(sw, '4b. Tune with BCM off', lambda: [sw.tune(20000000, 1000000, 0, 0), time.sleep(0.5)], expect_err=0x04) # BCM_NOT_READY test(sw, '4c. Signal monitor with BCM off', lambda: sw.dev.ctrl_transfer(0xC0, 0xB7, 0, 0, 8, timeout=3000), expect_no_hang=True) test(sw, '4d. I2C bus scan with BCM off', lambda: sw.dev.ctrl_transfer(0xC0, 0xB4, 0, 0, 16, timeout=5000), expect_no_hang=True) test(sw, '4e. Hotplug rescan with BCM off', lambda: sw.dev.ctrl_transfer(0xC0, 0xBE, 2, 0, 36, timeout=5000), expect_no_hang=True) # 4f. Recovery print(' >> Re-booting BCM4500...') r = sw.dev.ctrl_transfer(0xC0, 0x89, 1, 0, 3, timeout=10000) time.sleep(0.5) cfg = sw.get_config() if cfg & 0x03 == 0x03: print(f' [PASS] 4f. Recovery: config=0x{cfg:02X} (STARTED|FW_LOADED)') passed += 1 else: print(f' [FAIL] 4f. No recovery: config=0x{cfg:02X}') failed += 1 # 4g. Arm/disarm rapid toggle test(sw, '4g. Arm + immediate disarm', lambda: [sw.arm_transfer(True), sw.arm_transfer(False)], expect_no_hang=True) # 4h. Disarm when not armed test(sw, '4h. Disarm when not armed', lambda: sw.arm_transfer(False), expect_no_hang=True) # 4i. Boot off/on/off/on rapid test(sw, '4i. Rapid boot toggle (off-on-off-on)', lambda: [ sw.dev.ctrl_transfer(0xC0, 0x89, 0, 0, 3, timeout=5000), time.sleep(0.2), sw.dev.ctrl_transfer(0xC0, 0x89, 1, 0, 3, timeout=10000), time.sleep(0.3), sw.dev.ctrl_transfer(0xC0, 0x89, 0, 0, 3, timeout=5000), time.sleep(0.2), sw.dev.ctrl_transfer(0xC0, 0x89, 1, 0, 3, timeout=10000), time.sleep(0.3), ], expect_no_hang=True) print() # ============================================================ print('=== CAT 5: Boundary & Buffer Abuse ===') print() # 5a. Request 0 bytes from GET_CONFIG test(sw, '5a. GET_CONFIG request 0 bytes', lambda: sw.dev.ctrl_transfer(0xC0, 0x80, 0, 0, 0, timeout=2000), expect_no_hang=True) # 5b. Request 64 bytes from GET_CONFIG (returns 1) test(sw, '5b. GET_CONFIG request 64 bytes', lambda: sw.dev.ctrl_transfer(0xC0, 0x80, 0, 0, 64, timeout=2000), expect_no_hang=True) # 5c. Request 64 bytes from GET_LAST_ERROR (returns 1) test(sw, '5c. GET_LAST_ERROR request 64 bytes', lambda: sw.dev.ctrl_transfer(0xC0, 0xBC, 0, 0, 64, timeout=2000), expect_no_hang=True) # 5d. GET_FW_VERS request 1 byte (returns 6) test(sw, '5d. GET_FW_VERS request 1 byte', lambda: sw.dev.ctrl_transfer(0xC0, 0x92, 0, 0, 1, timeout=2000), expect_no_hang=True) # 5e. GET_STREAM_DIAG request 1 byte (returns 12) test(sw, '5e. GET_STREAM_DIAG request 1 byte', lambda: sw.dev.ctrl_transfer(0xC0, 0xBD, 0, 0, 1, timeout=2000), expect_no_hang=True) # 5f. GET_STREAM_DIAG with wval=0xFFFF (reset flag, but not 1) test(sw, '5f. GET_STREAM_DIAG wval=0xFFFF', lambda: sw.dev.ctrl_transfer(0xC0, 0xBD, 0xFFFF, 0, 12, timeout=2000), expect_no_hang=True) # 5g. GET_HOTPLUG with wval=0xFFFF (unknown sub-command) test(sw, '5g. GET_HOTPLUG wval=0xFFFF', lambda: sw.dev.ctrl_transfer(0xC0, 0xBE, 0xFFFF, 0, 36, timeout=2000), expect_no_hang=True) print() # ============================================================ print('=== CAT 6: Rapid-Fire Stress ===') print() t0 = time.time() for i in range(200): sw.get_config() dt = time.time() - t0 print(f' [PASS] 6a. 200 config reads: {dt * 1000:.0f}ms ({dt / 200 * 1000:.1f}ms/read)') passed += 1 t0 = time.time() for i in range(50): get_err(sw) dt = time.time() - t0 print(f' [PASS] 6b. 50 error reads: {dt * 1000:.0f}ms ({dt / 50 * 1000:.1f}ms/read)') passed += 1 t0 = time.time() errs = 0 for i in range(30): try: sw.signal_monitor() except Exception: errs += 1 dt = time.time() - t0 print(f' [PASS] 6c. 30 signal monitors: {dt * 1000:.0f}ms ({errs} errors)') passed += 1 sw.start_intersil(True) time.sleep(0.1) t0 = time.time() for i in range(40): sw.set_lnb_voltage(i % 2 == 0) dt = time.time() - t0 print(f' [PASS] 6d. 40 voltage toggles: {dt * 1000:.0f}ms') passed += 1 t0 = time.time() for i in range(10): try: sw.send_diseqc_message(bytes([0xE0, 0x10, 0x38, 0xF0 | (i & 3)])) time.sleep(0.05) except Exception: pass dt = time.time() - t0 print(f' [PASS] 6e. 10 DiSEqC msgs: {dt * 1000:.0f}ms') passed += 1 print() # ============================================================ print('=== CAT 7: Invalid Vendor Commands ===') print() for cmd, name in [(0xFF, '0xFF'), (0x01, '0x01'), (0x50, '0x50'), (0xFE, '0xFE'), (0x00, '0x00'), (0x79, '0x79')]: try: r = sw.dev.ctrl_transfer(0xC0, cmd, 0, 0, 1, timeout=2000) print(f' [INFO] 7. Cmd {name}: accepted (0x{r[0]:02X})') except usb.core.USBError: print(f' [PASS] 7. Cmd {name}: STALL (rejected)') passed += 1 test(sw, '7g. GET_CONFIG as OUT direction', lambda: sw.dev.ctrl_transfer(0x40, 0x80, 0, 0, bytes([0xAA]), timeout=2000), expect_no_hang=True) test(sw, '7h. 64-byte payload to GET_CONFIG', lambda: sw.dev.ctrl_transfer(0x40, 0x80, 0, 0, bytes(64), timeout=2000), expect_no_hang=True) print() # ============================================================ # CLEANUP # ============================================================ sw.set_22khz_tone(False) sw.set_lnb_voltage(False) sw.start_intersil(False) time.sleep(0.2) alive = device_alive(sw) final_err = get_err(sw) print('=' * 64) print(f' HAMILTON ADVERSARIAL TEST — FINAL RESULTS') print(f' -----------------------------------------') print(f' Tests passed: {passed}') print(f' Tests failed: {failed}') print(f' Device alive: {alive}') print(f' Final error: 0x{final_err:02X} [{err_name(final_err)}]') print(f' Watchdog fired: {"YES!" if final_err == 0x0D else "No"}') verdict = 'PASS' if failed == 0 and alive else 'FAIL' print(f' Verdict: {verdict}') print('=' * 64) if __name__ == '__main__': main()