Add EEPROM flash tool, TS analyzer, DVB-S2 investigation, and tune.py bugfix

New tools:
- tools/eeprom_write.py: EEPROM firmware flash with backup, verify, dry-run
- tools/ts_analyze.py: MPEG-2 transport stream analyzer with PAT/PMT parsing

DVB-S2 investigation confirms BCM4500 hardware limitation (no LDPC/BCH silicon).

Fix --json flag on tune.py subcommands (argparse parent/child scoping).
All tools verified against live SkyWalker-1 hardware.
This commit is contained in:
Ryan Malloy 2026-02-11 14:46:20 -07:00
parent a2845c37fb
commit c7b5932cc0
4 changed files with 1732 additions and 1 deletions

255
dvb-s2-investigation.md Normal file
View File

@ -0,0 +1,255 @@
# DVB-S2 Incompatibility Investigation: Genpix SkyWalker-1
## Definitive Conclusion
**The SkyWalker-1's inability to receive DVB-S2 is a fundamental hardware limitation of the Broadcom BCM4500 demodulator silicon, not a firmware limitation.** The BCM4500 was designed and fabricated before the DVB-S2 standard was ratified (2005) and contains no LDPC or BCH decoder hardware. DVB-S2 requires LDPC (Low-Density Parity-Check) and BCH (Bose-Chaudhuri-Hocquenghem) forward error correction -- entirely different decoder architectures from the Viterbi/turbo/Reed-Solomon decoders present in the BCM4500. No firmware update could add DVB-S2 support to this hardware.
Genpix eventually addressed this by releasing the SkyWalker-3, which replaced the entire demodulator subsystem (likely switching from Broadcom BCM4500 to STMicroelectronics STV0903), trading turbo-FEC support for DVB-S2 LDPC/BCH capability.
---
## 1. Does the BCM4500 Silicon Support DVB-S2?
**No. The BCM4500 has no LDPC or BCH decoder hardware.**
### BCM4500 FEC Architecture (from datasheet)
The BCM4500 contains exactly two FEC decoder paths:
1. **Advanced Modulation Turbo FEC Decoder** -- an iterative turbo code decoder supporting:
- QPSK: rates 1/4, 1/2, 3/4
- 8PSK: rates 2/3, 3/4, 5/6, 8/9
- 16QAM: rate 3/4
- Reed-Solomon outer code (t=10) after turbo decoding
2. **Legacy DVB/DIRECTV/DCII-Compliant FEC Decoder** -- a concatenated coding chain:
- Inner: Viterbi decoder (convolutional code, rates 1/2 through 7/8)
- Outer: Reed-Solomon decoder
The datasheet describes the signal path explicitly: "Optimized soft decisions are then fed into either a DVB/DIRECTV/DCII-compliant FEC decoder, or an advanced modulation turbo decoder." These are the only two paths. There is no third path for LDPC/BCH.
### DVB-S2 FEC Architecture (for comparison)
DVB-S2 (EN 302 307, ratified March 2005) mandates:
- **Inner code**: LDPC (Low-Density Parity-Check) -- block lengths of 64,800 or 16,200 bits
- **Outer code**: BCH (Bose-Chaudhuri-Hocquenghem)
- **Code rates**: 1/4, 1/3, 2/5, 1/2, 3/5, 2/3, 3/4, 4/5, 5/6, 8/9, 9/10
LDPC decoding requires dedicated hardware: large block RAM for message passing (the LDPC block size is 64,800 bits, requiring significant on-chip storage), iterative belief propagation logic, and a fundamentally different decoder architecture from both Viterbi and turbo decoders. This cannot be emulated in firmware on the BCM4500's simple 8-bit on-chip microcontroller (used only for configuration, acquisition, and monitoring -- not data-path processing).
### Evidence from the BCM4500 Datasheet
Source: [BCM4500 Datasheet (DatasheetQ)](https://html.datasheetq.com/pdf-html/885700/Broadcom/2page/BCM4500.html), [BCM4500 Datasheet (Elcodis)](https://elcodis.com/parts/5786421/BCM4500.html)
Key specifications confirming no DVB-S2 capability:
- Modulation: BPSK, QPSK, 8PSK, 16QAM (no mention of DVB-S2-specific constellations)
- FEC: "advanced modulation turbo FEC decoder" and "DVB/DIRECTV/DCII-compliant FEC decoder"
- Symbol rate: 256 Ksps to 30 Msps
- Package: 128-pin MQFP
- Supply: 3.3V I/O, 1.8V digital
- No mention of LDPC, BCH, or DVB-S2 anywhere in the datasheet
---
## 2. What FEC Types Does the BCM4500 Actually Support?
The BCM4500 supports three distinct FEC coding families, none of which are DVB-S2 compatible:
### 2.1 Viterbi + Reed-Solomon (Legacy DVB-S / DSS / DCII)
Used for standard DVB-S QPSK, DSS QPSK, DVB-S BPSK, and Digicipher II modes.
**Firmware evidence** (from `skywalker1-hardware-reference.md`, Section 6.3):
- FEC lookup table at XRAM 0xE0F9, maximum index 7
- Modulation dispatch sets XRAM 0xE0F6 = 0x00 (turbo flag OFF)
- XRAM 0xE0F5 = 0x10 (standard demod mode)
- The firmware FEC table supports rates: 1/2, 2/3, 3/4, 5/6, 7/8, auto, none
**Windows driver evidence** (`SkyWalker1Control.h`, line 59):
```c
m_CurResource.ulInnerFecType = BDA_FEC_VITERBI;
```
**Windows driver evidence** (`SkyWalker1TunerFilter.cpp`, lines 1069-1070):
```c
//Only supported FEC VITERBI Type Error Correction
else if(ulNewInnerFecType == BDA_FEC_VITERBI)
```
The Windows BDA driver explicitly rejects any FEC type other than `BDA_FEC_VITERBI` and restricts code rates to 1/2, 2/3, 3/4, 5/6, 7/8 (lines 1112-1116). There is no `BDA_FEC_LDPC` handling.
### 2.2 Turbo Codes (Proprietary 8PSK/QPSK/16QAM)
Used for Turbo QPSK, Turbo 8PSK, and Turbo 16QAM -- the proprietary "advanced modulation" modes developed by Broadcom for EchoStar/Dish Network.
**Firmware evidence** (from `tuning-protocol-analysis.md`, Section 3):
- Turbo QPSK: FEC table at XRAM 0xE0B7, max index 5
- Turbo 8PSK: FEC table at XRAM 0xE0B1, max index 5
- Turbo 16QAM: FEC table at XRAM 0xE0BC, max index 1
- All turbo modes set XRAM 0xE0F6 = 0x01 (turbo flag ON)
These turbo codes are proprietary to EchoStar/Broadcom. They are NOT the same as DVB-S2's LDPC codes, despite both being "advanced" coding schemes. The turbo decoder uses a fundamentally different iterative decoding algorithm (parallel concatenated convolutional codes) compared to LDPC (sparse parity-check matrix belief propagation).
### 2.3 Digicipher II (Motorola/GI Proprietary)
Used for DCII combo, split I/Q, and offset QPSK modes.
**Firmware evidence**: FEC table at XRAM 0xE0BD, max index 9, with a fixed FEC code of 0xFC written to XRAM 0xE0EB.
### Summary: FEC Architecture Comparison
| Feature | BCM4500 (SkyWalker-1) | DVB-S2 Requirement |
|---------|----------------------|-------------------|
| Inner FEC | Viterbi (DVB-S) or Turbo (proprietary) | LDPC |
| Outer FEC | Reed-Solomon (t=10) | BCH |
| Block size | Convolutional (streaming) / Turbo (short blocks) | 64,800 or 16,200 bits |
| Decoder type | Trellis-based (Viterbi) or iterative turbo | Iterative belief propagation |
| Hardware IP | Hardwired Viterbi + turbo silicon | Requires dedicated LDPC engine |
| Standardization | DVB-S (ETSI EN 300 421) + proprietary turbo | DVB-S2 (ETSI EN 302 307) |
---
## 3. What Would a DVB-S2-Capable Replacement Look Like?
### Broadcom's Own DVB-S2 Chip Timeline
Broadcom addressed DVB-S2 by designing entirely new silicon:
| Chip | Year | DVB-S2? | Key Feature | Source |
|------|------|---------|-------------|--------|
| **BCM4500** | ~2003 | No | Turbo FEC + legacy Viterbi/RS | [Datasheet](https://elcodis.com/parts/5786421/BCM4500.html) |
| **BCM4501** | 2006 | **Yes** | First dual-tuner DVB-S2 receiver; LDPC/BCH decoder | [Broadcom](https://www.broadcom.com/products/broadband/set-top-box/bcm4501), [EDN](https://www.edn.com/bcm4501-dual-dvb-s2-advanced-modulation-satellite-receiver/) |
| **BCM4505** | 2007 | **Yes** | Single-channel, 65nm, LDPC/BCH + legacy | [Broadcom](https://www.broadcom.com/products/broadband/set-top-box/bcm4505) |
| **BCM4506** | 2007 | **Yes** | Dual-channel, 65nm, LDPC/BCH + legacy | [Broadcom](https://www.broadcom.com/products/broadband/set-top-box/bcm4506) |
The BCM4501 datasheet explicitly states it includes "four 8-bit ADCs, all-digital variable rate QPSK/8PSK receivers, advanced modulation LDPC/BCH, and DVB-S-compliant forward error correction decoder." The addition of LDPC/BCH required new silicon -- it was not a firmware upgrade to the BCM4500.
However, Broadcom restricted sales of these chips to set-top box manufacturers (EchoStar, DIRECTV) and did not sell to PC peripheral makers. This is why Genpix could not simply drop in a BCM4501.
### What Genpix Actually Did: The SkyWalker-3
Genpix released the SkyWalker-3 as a DVB-S2-capable successor, using a completely different demodulator:
| Feature | SkyWalker-1 (BCM4500) | SkyWalker-3 (likely STV0903) |
|---------|----------------------|---------------------------|
| DVB-S QPSK | Yes | Yes |
| DVB-S2 QPSK | No | Yes (rates 1/2 through 9/10) |
| DVB-S2 8PSK | No | Yes (rates 3/5 through 9/10) |
| Turbo QPSK | Yes | **No** |
| Turbo 8PSK | Yes | **No** |
| Turbo 16QAM | Yes | **No** |
| DCII | Yes | Yes |
| DSS | Yes | Yes |
| Symbol rate (DVB-S) | 256 Ksps - 30 Msps | 1 - 45 Msps |
| Symbol rate (DVB-S2) | N/A | 5 - 33 Msps |
| FEC inner (DVB-S) | Viterbi | Viterbi |
| FEC inner (DVB-S2) | N/A | LDPC |
| FEC outer (DVB-S2) | N/A | BCH |
| Demodulator | Broadcom BCM4500 | STMicroelectronics STV0903 (probable) |
| Tuner | Broadcom BCM3440 | STMicroelectronics STV6110 (probable) |
Source: [Genpix SkyWalker-3 specifications](https://www.genpix-electronics.com/what-is-skywalker-3.html)
The trade-off is visible: the SkyWalker-3 gained DVB-S2 but lost turbo-FEC support entirely. The turbo codes were proprietary to Broadcom/EchoStar, and the STMicroelectronics STV0903 demodulator does not implement them. This means the SkyWalker-3 cannot receive Dish Network's legacy turbo-coded 8PSK transmissions.
---
## 4. Are There Any Hints of DVB-S2 Awareness in the Firmware?
**No. There are zero references to DVB-S2, LDPC, or BCH in any firmware version or in the Windows driver source.**
### Firmware Search Results
Searched all three firmware versions (v2.06, Rev.2 v2.10, v2.13) via Ghidra disassembly and the following source files:
- `SkyWalker1Control.h` -- defines modulation constants 0-9, none related to DVB-S2
- `SkyWalker1CommonDef.h` -- device parameter structure uses `BDA_FEC_VITERBI` only
- `SkyWalker1TunerFilter.cpp` -- explicitly rejects non-QPSK modulation types and non-Viterbi FEC
- `SkyWalker1Control.cpp` -- hardcodes `ADV_MOD_DVB_QPSK` (value 0) in tune command byte 8
**Specific evidence of no DVB-S2 awareness:**
1. **Modulation enum caps at 9** (`SkyWalker1Control.h`, lines 64-74): The modulation constants are `ADV_MOD_DVB_QPSK` (0) through `ADV_MOD_DVB_BPSK` (9). No value 10+ exists for DVB-S2 modes.
2. **Firmware dispatch table has exactly 10 entries** (`tuning-protocol-analysis.md`, Section 3.1): The jump table at CODE:0873 contains 20 bytes (10 entries x 2 bytes). Modulation values >= 10 are rejected by the bounds check at CODE:0866.
3. **FEC type is hardcoded to Viterbi** (`SkyWalker1TunerFilter.cpp`, line 1070): `else if(ulNewInnerFecType == BDA_FEC_VITERBI)` -- only Viterbi is accepted; any other FEC type returns `STATUS_INVALID_PARAMETER`.
4. **Tune command hardcodes DVB-S QPSK** (`SkyWalker1Control.cpp`, line 292): `ucCommand[8] = ADV_MOD_DVB_QPSK;` -- the Windows driver always sends modulation type 0 (DVB-S QPSK) regardless of what the application requests.
5. **No LDPC/BCH code rate values** exist in any FEC lookup table. The firmware's XRAM tables at 0xE0B1, 0xE0B7, 0xE0BC, 0xE0BD, and 0xE0F9 contain only Viterbi rates (1/2 through 7/8), turbo rates, and DCII combined codes.
6. **No DVB-S2-specific register addresses** appear in the I2C traffic. The BCM4500 is programmed exclusively through indirect registers 0xA6/0xA7/0xA8 with page 0x00 -- a protocol specific to the BCM4500. DVB-S2 demodulators like the STV0903 use entirely different register maps.
---
## 5. Could the GPIF Streaming Path Handle DVB-S2 Data Rates?
**Yes -- the USB data path is not the bottleneck. The GPIF/USB 2.0 streaming architecture could handle DVB-S2 data rates if the demodulator supported them.**
### Data Rate Analysis
**DVB-S2 maximum useful bit rate** (from ETSI EN 302 307):
- Highest configuration: 8PSK, rate 9/10, 30 Msps = ~72 Mbps raw, ~58 Mbps net after FEC
- Typical HD transponder: 8PSK, rate 3/4, 27.5 Msps = ~44 Mbps net
**GPIF/USB 2.0 throughput capacity:**
- USB 2.0 High Speed bulk: 480 Mbps theoretical, ~35 MB/s (~280 Mbps) practical
- GPIF engine: 48 MHz clock, 8-bit data path = 48 MB/s (384 Mbps) theoretical
- EP2 FIFO: 4x buffer with AUTOIN, 7 URBs x 8KB on host side
**Current DVB-S usage** (from `gpif-streaming-analysis.md`, Section 15):
- "Transport stream rate: BCM4500 outputs at the satellite symbol rate (up to 30 Msps), but the effective byte rate depends on modulation and coding. USB 2.0 High Speed bulk bandwidth (480 Mbps theoretical, ~35 MB/s practical) is more than sufficient for DVB-S transport streams (typically 1-5 MB/s)."
**Assessment**: Even at the theoretical maximum DVB-S2 data rate of ~58 Mbps (~7.25 MB/s), the USB 2.0 bulk streaming path has approximately 5x headroom. The GPIF engine configuration (IFCONFIG=0xEE, EP2FIFOCFG=0x0C, FLOWSTATEA with FSEN enabled) is identical across all firmware versions and provides a fully hardware-managed pipeline that would not require modification.
The 8-bit parallel transport stream interface between the demodulator and FX2 is also sufficient -- DVB-S2 uses the same MPEG-TS output format (188-byte packets) as DVB-S. The GPIF waveform and AUTOIN configuration would work unchanged.
**However**, this is a moot point. The bottleneck is the demodulator silicon, not the data path. Even if you physically replaced the BCM4500 with a DVB-S2-capable chip, you would need to rewrite the entire FX2 firmware (I2C register protocol, tuning sequence, modulation dispatch, FEC configuration) since every DVB-S2 demodulator uses a completely different register interface.
---
## Summary
| Question | Answer |
|----------|--------|
| Is DVB-S2 a hardware or firmware limitation? | **Hardware** -- the BCM4500 has no LDPC/BCH decoder logic |
| Could a firmware update add DVB-S2? | **No** -- LDPC decoding requires dedicated silicon |
| Which Broadcom chip first added LDPC? | **BCM4501** (2006), followed by BCM4505/BCM4506 (2007) |
| Any DVB-S2 hints in firmware/driver? | **None** -- zero references to LDPC, BCH, or DVB-S2 |
| Is the USB data path a bottleneck? | **No** -- USB 2.0 bulk has ~5x headroom for DVB-S2 rates |
| What did Genpix do for DVB-S2? | Released SkyWalker-3 with a different demodulator (likely STV0903) |
| What was lost in the transition? | Turbo-FEC support (proprietary to Broadcom/EchoStar) |
---
## Sources
### Datasheets and Product Pages
- [BCM4500 Datasheet (DatasheetQ)](https://html.datasheetq.com/pdf-html/885700/Broadcom/2page/BCM4500.html)
- [BCM4500 Datasheet (Elcodis)](https://elcodis.com/parts/5786421/BCM4500.html)
- [BCM4500 Datasheet (AllDatasheet)](https://www.alldatasheet.com/datasheet-pdf/pdf/85246/BOARDCOM/BCM4500.html)
- [BCM4501 Product Page (Broadcom)](https://www.broadcom.com/products/broadband/set-top-box/bcm4501)
- [BCM4501 (EDN)](https://www.edn.com/bcm4501-dual-dvb-s2-advanced-modulation-satellite-receiver/)
- [BCM4505 Product Page (Broadcom)](https://www.broadcom.com/products/broadband/set-top-box/bcm4505)
- [BCM4506 Product Page (Broadcom)](https://www.broadcom.com/products/broadband/set-top-box/bcm4506)
- [BCM4505/BCM4506 Announcement (RTTNews)](https://www.rttnews.com/380136/broadcom-launches-bcm4505-and-bcm4506-two-fully-integrated-single-chip-single-and-dual-channel-multi-format-satellite-receivers-quick-facts.aspx)
### Genpix Products
- [Genpix SkyWalker-3 Specifications](https://www.genpix-electronics.com/what-is-skywalker-3.html)
- [Genpix Official Site](https://www.genpix-electronics.com/index.php?act=viewDoc&docId=9)
### Community and Technical Discussions
- [LinuxTV mailing list: BCM4500 and DVB-S2 distinction](https://www.mail-archive.com/linux-dvb@linuxtv.org/msg24808.html)
- [SatelliteGuys: Turbo 8PSK card reverse engineering](https://www.satelliteguys.us/xen/threads/another-turbo-8psk-card.246879/)
- [SatelliteGuys: Genpix SkyWalker-1 discussion](https://www.satelliteguys.us/xen/threads/genpix-skywalker-1.214196/)
### Reverse Engineering Analysis (This Project)
- `skywalker1-hardware-reference.md` -- Sections 1, 6, 7: Overview, tuning protocol, BCM4500 interface
- `tuning-protocol-analysis.md` -- Section 3: Modulation dispatch table, FEC lookup tables
- `gpif-streaming-analysis.md` -- Sections 13-15: GPIF throughput and data path analysis
- `rev2-deep-analysis.md` -- Complete Rev.2 function inventory
- `SkyWalker1Control.h` -- Modulation mode constants (lines 63-74), FEC/command definitions
- `SkyWalker1TunerFilter.cpp` -- SetInnerFecType() Viterbi-only restriction (lines 1058-1086)
- `SkyWalker1Control.cpp` -- TuneDevice() hardcoded ADV_MOD_DVB_QPSK (line 292)

575
tools/eeprom_write.py Executable file
View File

@ -0,0 +1,575 @@
#!/usr/bin/env python3
"""
Genpix SkyWalker-1 EEPROM firmware flash tool.
Writes C2-format firmware images to the Cypress FX2 boot EEPROM via
the I2C_WRITE vendor command.
Protocol:
I2C_WRITE (0x83): wValue=0x51, wIndex=offset, data=bytes
I2C_READ (0x84): wValue=0x51, wIndex=offset, length=chunk_size
The EEPROM uses Cypress C2 IIC boot format:
- Header: C2 VID_L VID_H PID_L PID_H DID_L DID_H CONFIG
- Records: LEN_H LEN_L ADDR_H ADDR_L DATA[LEN]
- End: 80 01 ENTRY_H ENTRY_L (reset vector)
WARNING: Flashing incorrect firmware can brick the device. The FX2
boots from this EEPROM on power-up -- a corrupted image means the
device will not enumerate on USB until the EEPROM is reprogrammed
with an external programmer or the FX2 boot ROM's A0 vendor request.
"""
import usb.core, usb.util, sys, struct, time, os
VENDOR_ID = 0x09C0
PRODUCT_ID = 0x0203
I2C_WRITE = 0x83
I2C_READ = 0x84
EEPROM_SLAVE = 0x51
# EEPROM page write parameters
PAGE_SIZE = 16 # Conservative page size for 24Cxx EEPROMs
WRITE_CYCLE_MS = 10 # Max internal write cycle time per page
MAX_EEPROM_SIZE = 16384 # 128Kbit / 16KB max addressable via wIndex
def find_device():
dev = usb.core.find(idVendor=VENDOR_ID, idProduct=PRODUCT_ID)
if dev is None:
print("SkyWalker-1 not found")
sys.exit(1)
return dev
def detach_driver(dev):
intf_num = None
for cfg in dev:
for intf in cfg:
if dev.is_kernel_driver_active(intf.bInterfaceNumber):
try:
dev.detach_kernel_driver(intf.bInterfaceNumber)
intf_num = intf.bInterfaceNumber
except usb.core.USBError as e:
print(f"Cannot detach driver: {e}")
print("Try: sudo modprobe -r dvb_usb_gp8psk")
sys.exit(1)
try:
dev.set_configuration()
except:
pass
return intf_num
def eeprom_read(dev, offset, length=64):
"""Read from EEPROM at given offset."""
return dev.ctrl_transfer(
usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_IN,
I2C_READ, EEPROM_SLAVE, offset, length, 2000)
def eeprom_write(dev, offset, data):
"""Write data to EEPROM at given offset. Caller handles page alignment."""
return dev.ctrl_transfer(
usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_OUT,
I2C_WRITE, EEPROM_SLAVE, offset, data, 2000)
def eeprom_read_all(dev, size, label="Reading"):
"""Read entire EEPROM contents up to size bytes."""
chunk_size = 64
data = bytearray()
for offset in range(0, size, chunk_size):
remaining = min(chunk_size, size - offset)
chunk = eeprom_read(dev, offset, remaining)
if chunk is None:
print(f"\n Read failed at offset 0x{offset:04X}")
return None
data.extend(bytes(chunk))
if offset % 1024 == 0:
pct = offset * 100 // size
print(f"\r {label}: 0x{offset:04X} / 0x{size:04X} [{pct:3d}%]",
end="", flush=True)
print(f"\r {label}: 0x{size:04X} / 0x{size:04X} [100%] ")
return data
def parse_c2_header(data):
"""Parse Cypress C2 boot EEPROM header. Returns dict or None."""
if len(data) < 8:
return None
if data[0] != 0xC2:
return None
vid = data[2] << 8 | data[1]
pid = data[4] << 8 | data[3]
did = data[6] << 8 | data[5]
config = data[7]
return {"vid": vid, "pid": pid, "did": did, "config": config}
def parse_records(data, offset=8):
"""Parse C2 load records from EEPROM data."""
records = []
while offset < len(data) - 4:
rec_len = (data[offset] << 8) | data[offset + 1]
rec_addr = (data[offset + 2] << 8) | data[offset + 3]
if rec_len == 0x8001:
records.append({
"type": "end",
"entry_point": rec_addr,
"offset": offset
})
break
elif rec_len == 0 or rec_len > 0x4000:
records.append({
"type": "invalid",
"raw_len": rec_len,
"offset": offset
})
break
rec_data = data[offset + 4:offset + 4 + rec_len]
records.append({
"type": "data",
"length": rec_len,
"load_addr": rec_addr,
"data": bytes(rec_data),
"offset": offset
})
offset += 4 + rec_len
return records
def print_c2_header(header, prefix=" "):
"""Display parsed C2 header fields."""
print(f"{prefix}Format: C2 (Large EEPROM, code loads to internal RAM)")
print(f"{prefix}VID: 0x{header['vid']:04X}"
f" {'(Genpix)' if header['vid'] == 0x09C0 else ''}")
print(f"{prefix}PID: 0x{header['pid']:04X}"
f" {'(SkyWalker-1)' if header['pid'] == 0x0203 else ''}")
print(f"{prefix}DID: 0x{header['did']:04X}")
print(f"{prefix}Config: 0x{header['config']:02X}", end="")
config_flags = []
if header["config"] & 0x40:
config_flags.append("400kHz I2C")
if header["config"] & 0x04:
config_flags.append("disconnect")
if config_flags:
print(f" ({', '.join(config_flags)})")
else:
print()
def print_c2_records(records, prefix=" "):
"""Display parsed C2 load records."""
total_code = 0
for i, rec in enumerate(records):
if rec["type"] == "data":
end_addr = rec["load_addr"] + rec["length"] - 1
preview = rec["data"][:8].hex(' ')
print(f"{prefix}[{i}] {rec['length']:5d} bytes -> "
f"0x{rec['load_addr']:04X}-0x{end_addr:04X} "
f"[{preview}...]")
total_code += rec["length"]
elif rec["type"] == "end":
print(f"{prefix}[{i}] END MARKER -> entry point: "
f"0x{rec['entry_point']:04X}")
else:
print(f"{prefix}[{i}] INVALID (raw_len=0x{rec['raw_len']:04X}) "
f"at EEPROM offset 0x{rec['offset']:04X}")
data_recs = [r for r in records if r["type"] == "data"]
print(f"\n{prefix}Total firmware: {total_code} bytes in "
f"{len(data_recs)} segments")
end_recs = [r for r in records if r["type"] == "end"]
if end_recs:
print(f"{prefix}Entry point: 0x{end_recs[0]['entry_point']:04X} "
f"(LJMP target after boot)")
def validate_c2_image(data, label="image"):
"""Validate a C2 firmware image. Returns (header, records) or exits."""
if len(data) < 12:
print(f" {label}: too small ({len(data)} bytes, need at least 12)")
return None, None
if data[0] != 0xC2:
print(f" {label}: not a C2 image (first byte: 0x{data[0]:02X}, "
f"expected 0xC2)")
return None, None
header = parse_c2_header(data)
if header is None:
print(f" {label}: failed to parse C2 header")
return None, None
records = parse_records(data)
if not records:
print(f" {label}: no load records found")
return None, None
end_recs = [r for r in records if r["type"] == "end"]
invalid_recs = [r for r in records if r["type"] == "invalid"]
if not end_recs:
print(f" {label}: WARNING -- no end marker found")
if invalid_recs:
print(f" {label}: WARNING -- {len(invalid_recs)} invalid record(s)")
return header, records
def cmd_info(args):
"""Parse and display C2 header info from a .bin file."""
if not os.path.exists(args.file):
print(f"File not found: {args.file}")
sys.exit(1)
with open(args.file, 'rb') as f:
data = f.read()
print(f"C2 Image: {args.file}")
print(f"File size: {len(data)} bytes")
print("=" * 40)
header, records = validate_c2_image(data, args.file)
if header is None:
sys.exit(1)
print("\nHeader:")
print_c2_header(header)
print("\nLoad Records:")
print_c2_records(records)
# Compute EEPROM usage (header + record headers + data + end marker)
if records:
last = records[-1]
if last["type"] == "end":
eeprom_end = last["offset"] + 4
elif last["type"] == "data":
eeprom_end = last["offset"] + 4 + last["length"]
else:
eeprom_end = last["offset"]
print(f"\n EEPROM footprint: {eeprom_end} bytes "
f"(0x{eeprom_end:04X})")
def cmd_backup(args):
"""Dump current EEPROM contents to a file."""
print("Genpix SkyWalker-1 EEPROM Backup")
print("=" * 40)
dev = find_device()
print(f"Found device: Bus {dev.bus} Addr {dev.address}")
intf = detach_driver(dev)
try:
size = args.max_size
print(f"\nReading EEPROM ({size} bytes)...")
data = eeprom_read_all(dev, size)
if data is None:
print("Backup failed: read error")
sys.exit(1)
with open(args.output, 'wb') as f:
f.write(data)
print(f" Saved to: {args.output}")
# Show header info
header = parse_c2_header(data)
if header:
print("\nHeader:")
print_c2_header(header)
finally:
if intf is not None:
try:
usb.util.release_interface(dev, intf)
dev.attach_kernel_driver(intf)
print("\nRe-attached kernel driver")
except:
print("\nNote: run 'sudo modprobe dvb_usb_gp8psk' to reload")
def cmd_verify(args):
"""Compare a .bin file against current EEPROM contents."""
if not os.path.exists(args.file):
print(f"File not found: {args.file}")
sys.exit(1)
with open(args.file, 'rb') as f:
image = f.read()
print("Genpix SkyWalker-1 EEPROM Verify")
print("=" * 40)
# Validate the image first
header, records = validate_c2_image(image, args.file)
if header is None:
sys.exit(1)
print(f"\nImage: {args.file} ({len(image)} bytes)")
print_c2_header(header)
dev = find_device()
print(f"\nFound device: Bus {dev.bus} Addr {dev.address}")
intf = detach_driver(dev)
try:
print(f"\nReading EEPROM ({len(image)} bytes)...")
eeprom = eeprom_read_all(dev, len(image), label="Verify")
if eeprom is None:
print("Verify failed: read error")
sys.exit(1)
# Compare byte-by-byte
mismatches = []
for i in range(len(image)):
if i < len(eeprom) and image[i] != eeprom[i]:
mismatches.append(i)
if not mismatches:
print(f"\n MATCH -- EEPROM contents match {args.file}")
else:
print(f"\n MISMATCH -- {len(mismatches)} byte(s) differ:")
for off in mismatches[:32]:
exp = image[off]
got = eeprom[off] if off < len(eeprom) else 0xFF
print(f" 0x{off:04X}: expected 0x{exp:02X}, "
f"got 0x{got:02X}")
if len(mismatches) > 32:
print(f" ... and {len(mismatches) - 32} more")
sys.exit(1)
finally:
if intf is not None:
try:
usb.util.release_interface(dev, intf)
dev.attach_kernel_driver(intf)
print("\nRe-attached kernel driver")
except:
print("\nNote: run 'sudo modprobe dvb_usb_gp8psk' to reload")
def cmd_flash(args):
"""Write a C2-format .bin file to the EEPROM."""
if not os.path.exists(args.file):
print(f"File not found: {args.file}")
sys.exit(1)
with open(args.file, 'rb') as f:
image = f.read()
print("Genpix SkyWalker-1 EEPROM Flash")
print("=" * 40)
print()
print(" *** FIRMWARE FLASH -- READ CAREFULLY ***")
print(" Writing bad firmware will brick the device.")
print(" The SkyWalker-1 boots from this EEPROM on power-up.")
print(" A corrupted image = no USB enumeration.")
print()
# Validate input image
img_header, img_records = validate_c2_image(image, args.file)
if img_header is None:
sys.exit(1)
print(f"Image: {args.file} ({len(image)} bytes)")
print_c2_header(img_header)
# Size sanity check
if len(image) > MAX_EEPROM_SIZE:
print(f"\n Image too large: {len(image)} bytes "
f"(max {MAX_EEPROM_SIZE})")
sys.exit(1)
if len(image) < 12:
print(f"\n Image too small: {len(image)} bytes")
sys.exit(1)
# Connect to device
dev = find_device()
print(f"\nFound device: Bus {dev.bus} Addr {dev.address}")
intf = detach_driver(dev)
try:
# Check VID/PID against the connected device
if not args.force:
if img_header["vid"] != VENDOR_ID:
print(f"\n VID mismatch: image has 0x{img_header['vid']:04X},"
f" device is 0x{VENDOR_ID:04X}")
print(" Use --force to override")
sys.exit(1)
if img_header["pid"] != PRODUCT_ID:
print(f"\n PID mismatch: image has 0x{img_header['pid']:04X},"
f" device is 0x{PRODUCT_ID:04X}")
print(" Use --force to override")
sys.exit(1)
elif img_header["vid"] != VENDOR_ID or img_header["pid"] != PRODUCT_ID:
print(f"\n WARNING: VID/PID mismatch (--force active)")
print(f" Image: VID=0x{img_header['vid']:04X} "
f"PID=0x{img_header['pid']:04X}")
print(f" Device: VID=0x{VENDOR_ID:04X} PID=0x{PRODUCT_ID:04X}")
# Backup current EEPROM
if not args.no_backup:
ts = time.strftime("%Y%m%d_%H%M%S")
backup_file = f"eeprom_backup_{ts}.bin"
print(f"\nBacking up current EEPROM to {backup_file}...")
backup = eeprom_read_all(dev, MAX_EEPROM_SIZE, label="Backup")
if backup is None:
print(" Backup failed: read error. Aborting.")
sys.exit(1)
with open(backup_file, 'wb') as f:
f.write(backup)
print(f" Backup saved: {backup_file} ({len(backup)} bytes)")
# Show what's currently on the EEPROM
cur_header = parse_c2_header(backup)
if cur_header:
print("\n Current EEPROM:")
print_c2_header(cur_header, prefix=" ")
else:
print("\n Skipping backup (--no-backup)")
# Dry-run stops here
if args.dry_run:
print("\n DRY RUN -- would write {0} bytes in {1} pages".format(
len(image), (len(image) + PAGE_SIZE - 1) // PAGE_SIZE))
print(" No changes made.")
return
# Final confirmation
print(f"\nAbout to write {len(image)} bytes to EEPROM...")
print(" Press Ctrl+C within 3 seconds to abort.")
try:
for i in range(3, 0, -1):
print(f"\r Writing in {i}... ", end="", flush=True)
time.sleep(1)
print("\r Writing now... ")
except KeyboardInterrupt:
print("\n Aborted.")
return
# Write in page-sized chunks
total_pages = (len(image) + PAGE_SIZE - 1) // PAGE_SIZE
write_errors = 0
for page_num in range(total_pages):
offset = page_num * PAGE_SIZE
end = min(offset + PAGE_SIZE, len(image))
chunk = image[offset:end]
pct = (page_num + 1) * 100 // total_pages
print(f"\r Write: 0x{offset:04X} / 0x{len(image):04X} "
f"[{pct:3d}%]", end="", flush=True)
try:
written = eeprom_write(dev, offset, chunk)
if written != len(chunk):
print(f"\n Short write at 0x{offset:04X}: "
f"sent {len(chunk)}, wrote {written}")
write_errors += 1
except usb.core.USBError as e:
print(f"\n Write error at 0x{offset:04X}: {e}")
write_errors += 1
# Wait for EEPROM internal write cycle
time.sleep(WRITE_CYCLE_MS / 1000.0)
print(f"\r Write: 0x{len(image):04X} / 0x{len(image):04X} "
f"[100%] ")
if write_errors:
print(f"\n WARNING: {write_errors} write error(s) occurred")
# Verify by reading back
print(f"\nVerifying ({len(image)} bytes)...")
verify = eeprom_read_all(dev, len(image), label="Verify")
if verify is None:
print(" Verify failed: read error")
print(" *** EEPROM STATE UNKNOWN -- check before power cycling ***")
sys.exit(1)
mismatches = []
for i in range(len(image)):
if i < len(verify) and image[i] != verify[i]:
mismatches.append(i)
if not mismatches:
print(f"\n VERIFIED -- all {len(image)} bytes match")
print(" Flash complete. Power cycle the device to boot new firmware.")
else:
print(f"\n VERIFY FAILED -- {len(mismatches)} byte(s) differ:")
for off in mismatches[:16]:
exp = image[off]
got = verify[off] if off < len(verify) else 0xFF
print(f" 0x{off:04X}: wrote 0x{exp:02X}, "
f"read 0x{got:02X}")
if len(mismatches) > 16:
print(f" ... and {len(mismatches) - 16} more")
print("\n *** EEPROM CONTENTS DO NOT MATCH IMAGE ***")
print(" Do NOT power cycle until this is resolved.")
sys.exit(1)
finally:
if intf is not None:
try:
usb.util.release_interface(dev, intf)
dev.attach_kernel_driver(intf)
print("\nRe-attached kernel driver")
except:
print("\nNote: run 'sudo modprobe dvb_usb_gp8psk' to reload")
def main():
import argparse
parser = argparse.ArgumentParser(
description="SkyWalker-1 EEPROM firmware flash tool")
sub = parser.add_subparsers(dest='command', required=True)
# info
p_info = sub.add_parser('info',
help='Parse and display C2 header from a .bin file')
p_info.add_argument('file', help='C2 firmware image (.bin)')
# backup
p_backup = sub.add_parser('backup',
help='Dump current EEPROM to a file')
p_backup.add_argument('-o', '--output', default='skywalker1_eeprom.bin',
help='Output file (default: skywalker1_eeprom.bin)')
p_backup.add_argument('--max-size', type=int, default=MAX_EEPROM_SIZE,
help=f'Bytes to read (default: {MAX_EEPROM_SIZE})')
# verify
p_verify = sub.add_parser('verify',
help='Compare .bin file against EEPROM')
p_verify.add_argument('file', help='C2 firmware image (.bin)')
# flash
p_flash = sub.add_parser('flash',
help='Write C2 firmware image to EEPROM')
p_flash.add_argument('file', help='C2 firmware image (.bin)')
p_flash.add_argument('--dry-run', action='store_true',
help='Show what would happen without writing')
p_flash.add_argument('--no-backup', action='store_true',
help='Skip pre-flash EEPROM backup')
p_flash.add_argument('--force', action='store_true',
help='Override VID/PID mismatch check')
args = parser.parse_args()
if args.command == 'info':
cmd_info(args)
elif args.command == 'backup':
cmd_backup(args)
elif args.command == 'verify':
cmd_verify(args)
elif args.command == 'flash':
cmd_flash(args)
if __name__ == '__main__':
main()

897
tools/ts_analyze.py Executable file
View File

@ -0,0 +1,897 @@
#!/usr/bin/env python3
"""
Genpix SkyWalker-1 MPEG-2 Transport Stream analyzer.
Parses and analyzes 188-byte MPEG-2 TS packets from .ts files captured
by tune.py, stdin pipes, or any standard transport stream source.
Supports PID analysis, PAT/PMT parsing, continuity counter checking,
scrambling detection, hex packet dumps, and live stream monitoring.
Reference: ISO/IEC 13818-1 (MPEG-2 Systems)
TS packet: 188 bytes, sync byte 0x47
"""
import sys
import struct
import argparse
import time
import os
TS_PACKET_SIZE = 188
TS_SYNC_BYTE = 0x47
# Well-known PID assignments (ISO 13818-1 Table 2-3)
KNOWN_PIDS = {
0x0000: "PAT",
0x0001: "CAT",
0x0002: "TSDT",
0x0010: "NIT/ST",
0x0011: "SDT/BAT/ST",
0x0012: "EIT/ST",
0x0013: "RST/ST",
0x0014: "TDT/TOT/ST",
0x001E: "DIT",
0x001F: "SIT",
0x1FFF: "Null",
}
# Stream type identifiers (ISO 13818-1 Table 2-36)
STREAM_TYPES = {
0x00: "Reserved",
0x01: "MPEG-1 Video (11172-2)",
0x02: "MPEG-2 Video (13818-2)",
0x03: "MPEG-1 Audio (11172-3)",
0x04: "MPEG-2 Audio (13818-3)",
0x05: "Private Sections (13818-1)",
0x06: "PES Private Data",
0x07: "MHEG",
0x08: "DSM-CC",
0x09: "H.222.1",
0x0A: "DSM-CC Type A",
0x0B: "DSM-CC Type B",
0x0C: "DSM-CC Type C",
0x0D: "DSM-CC Type D",
0x0E: "Auxiliary",
0x0F: "MPEG-2 AAC Audio",
0x10: "MPEG-4 Visual",
0x11: "MPEG-4 AAC Audio (LATM)",
0x15: "Metadata in PES",
0x1B: "H.264/AVC Video",
0x24: "H.265/HEVC Video",
0x42: "AVS Video",
0x81: "AC-3 Audio (ATSC)",
0x82: "DTS Audio",
0x83: "Dolby TrueHD",
0x84: "Dolby Digital Plus (EAC-3)",
0x85: "DTS-HD",
0x86: "DTS-HD Master Audio",
0x87: "EAC-3 Audio (ATSC)",
0xEA: "VC-1 Video",
}
class TSPacket:
"""Parsed MPEG-2 transport stream packet header."""
__slots__ = (
'sync', 'tei', 'pusi', 'priority', 'pid',
'scrambling', 'adaptation', 'continuity',
'adaptation_field', 'payload', 'raw',
)
def __init__(self, data: bytes):
if len(data) != TS_PACKET_SIZE:
raise ValueError(f"Packet must be {TS_PACKET_SIZE} bytes, got {len(data)}")
self.raw = data
self.sync = data[0]
self.tei = bool(data[1] & 0x80)
self.pusi = bool(data[1] & 0x40)
self.priority = bool(data[1] & 0x20)
self.pid = ((data[1] & 0x1F) << 8) | data[2]
self.scrambling = (data[3] >> 6) & 0x03
self.adaptation = (data[3] >> 4) & 0x03
self.continuity = data[3] & 0x0F
# Parse adaptation field and payload boundaries
offset = 4
self.adaptation_field = None
self.payload = None
if self.adaptation & 0x02:
# Adaptation field present
if offset < TS_PACKET_SIZE:
af_len = data[offset]
af_end = offset + 1 + af_len
if af_end <= TS_PACKET_SIZE:
self.adaptation_field = data[offset:af_end]
offset = af_end
if self.adaptation & 0x01:
# Payload present
if offset < TS_PACKET_SIZE:
self.payload = data[offset:]
def has_pcr(self) -> bool:
"""Check if adaptation field contains a PCR."""
if self.adaptation_field is None or len(self.adaptation_field) < 7:
return False
af_flags = self.adaptation_field[1] if len(self.adaptation_field) > 1 else 0
return bool(af_flags & 0x10)
def get_pcr(self) -> int:
"""Extract PCR value (in 27 MHz clock ticks). Returns -1 if no PCR."""
if not self.has_pcr():
return -1
# PCR is 6 bytes starting at adaptation_field[2]
af = self.adaptation_field
pcr_base = (af[2] << 25) | (af[3] << 17) | (af[4] << 9) | \
(af[5] << 1) | ((af[6] >> 7) & 0x01)
pcr_ext = ((af[6] & 0x01) << 8) | af[7]
return pcr_base * 300 + pcr_ext
class TSReader:
"""Reads TS packets from a file or stream, handling sync alignment."""
def __init__(self, source, verbose: bool = False):
self.source = source
self.verbose = verbose
self.offset = 0
self._sync_offset = -1
def find_sync(self, data: bytes) -> int:
"""Find sync byte alignment in raw data. Returns byte offset or -1."""
# Need at least 3 consecutive sync bytes to confirm alignment
for i in range(min(len(data), TS_PACKET_SIZE)):
if data[i] != TS_SYNC_BYTE:
continue
# Check for consecutive sync bytes at 188-byte intervals
ok = True
for check in range(1, 4):
pos = i + check * TS_PACKET_SIZE
if pos >= len(data):
# Not enough data to confirm, accept if at least one more matches
if check >= 2:
break
ok = False
break
if data[pos] != TS_SYNC_BYTE:
ok = False
break
if ok:
return i
return -1
def iter_packets(self, max_packets: int = 0):
"""Yield TSPacket objects from the source."""
buf = b''
synced = False
count = 0
while True:
chunk = self.source.read(65536)
if not chunk:
break
buf += chunk
if not synced:
sync_off = self.find_sync(buf)
if sync_off < 0:
# Keep last 187 bytes in case sync straddles chunk boundary
if len(buf) > TS_PACKET_SIZE * 4:
buf = buf[-(TS_PACKET_SIZE - 1):]
continue
self._sync_offset = sync_off + self.offset
if self.verbose and sync_off > 0:
print(f" Sync found at byte offset {sync_off}", file=sys.stderr)
buf = buf[sync_off:]
synced = True
while len(buf) >= TS_PACKET_SIZE:
pkt_data = buf[:TS_PACKET_SIZE]
buf = buf[TS_PACKET_SIZE:]
if pkt_data[0] != TS_SYNC_BYTE:
# Lost sync, try to re-acquire
synced = False
if self.verbose:
print(f" Sync lost, re-scanning...", file=sys.stderr)
break
count += 1
yield TSPacket(pkt_data)
if max_packets and count >= max_packets:
return
self.offset += len(chunk)
@property
def sync_offset(self) -> int:
return self._sync_offset
class PSIParser:
"""Parse PSI sections from TS packet payloads."""
def __init__(self):
self._section_bufs = {} # pid -> accumulated bytes
def feed(self, pkt: TSPacket) -> dict:
"""Feed a packet, return parsed section dict or None."""
if pkt.payload is None:
return None
pid = pkt.pid
payload = pkt.payload
if pkt.pusi:
# Payload Unit Start Indicator set
if len(payload) < 1:
return None
pointer = payload[0]
payload = payload[1 + pointer:]
self._section_bufs[pid] = payload
elif pid in self._section_bufs:
self._section_bufs[pid] += payload
else:
return None
return self._try_parse(pid)
def _try_parse(self, pid: int) -> dict:
"""Try to parse a complete section from the buffer."""
buf = self._section_bufs.get(pid, b'')
if len(buf) < 3:
return None
table_id = buf[0]
section_length = ((buf[1] & 0x0F) << 8) | buf[2]
total_len = 3 + section_length
if len(buf) < total_len:
return None # Incomplete, wait for more data
section = buf[:total_len]
# Clear buffer for next section
self._section_bufs[pid] = buf[total_len:]
if section_length < 5:
return None
result = {
"table_id": table_id,
"section_syntax": bool(buf[1] & 0x80),
"section_length": section_length,
"raw": section,
}
if result["section_syntax"]:
result["table_id_ext"] = (section[3] << 8) | section[4]
result["version"] = (section[5] >> 1) & 0x1F
result["current_next"] = section[5] & 0x01
result["section_number"] = section[6]
result["last_section_number"] = section[7]
result["data"] = section[8:-4]
result["crc32"] = struct.unpack_from('>I', section, total_len - 4)[0]
return result
def parse_pat(section: dict) -> dict:
"""Parse a Program Association Table section."""
if section is None or section["table_id"] != 0x00:
return None
transport_stream_id = section["table_id_ext"]
data = section["data"]
programs = {}
for i in range(0, len(data), 4):
if i + 4 > len(data):
break
prog_num = (data[i] << 8) | data[i + 1]
pmt_pid = ((data[i + 2] & 0x1F) << 8) | data[i + 3]
programs[prog_num] = pmt_pid
return {
"transport_stream_id": transport_stream_id,
"version": section["version"],
"programs": programs,
}
def parse_pmt(section: dict) -> dict:
"""Parse a Program Map Table section."""
if section is None or section["table_id"] != 0x02:
return None
program_number = section["table_id_ext"]
data = section["raw"]
if len(data) < 12:
return None
pcr_pid = ((data[8] & 0x1F) << 8) | data[9]
prog_info_len = ((data[10] & 0x0F) << 8) | data[11]
offset = 12 + prog_info_len
streams = []
while offset + 5 <= len(data) - 4: # -4 for CRC
stream_type = data[offset]
elementary_pid = ((data[offset + 1] & 0x1F) << 8) | data[offset + 2]
es_info_len = ((data[offset + 3] & 0x0F) << 8) | data[offset + 4]
streams.append({
"stream_type": stream_type,
"elementary_pid": elementary_pid,
"es_info_length": es_info_len,
"type_name": STREAM_TYPES.get(stream_type, f"Unknown (0x{stream_type:02X})"),
})
offset += 5 + es_info_len
return {
"program_number": program_number,
"version": section["version"],
"pcr_pid": pcr_pid,
"streams": streams,
}
def open_input(path: str):
"""Open TS input from a file path or stdin ('-')."""
if path == '-':
return sys.stdin.buffer
if not os.path.exists(path):
print(f"File not found: {path}")
sys.exit(1)
return open(path, 'rb')
def format_pid(pid: int, known: dict = None) -> str:
"""Format a PID with its known name if available."""
if known is None:
known = KNOWN_PIDS
name = known.get(pid, "")
if name:
return f"0x{pid:04X} ({name})"
return f"0x{pid:04X}"
# -- Subcommand handlers --
def cmd_analyze(args: argparse.Namespace) -> None:
"""Full transport stream analysis."""
source = open_input(args.input)
reader = TSReader(source, verbose=args.verbose)
pid_counts = {}
pid_cc = {} # Last continuity counter per PID
cc_errors = {}
tei_count = 0
scrambled_count = 0
total_packets = 0
first_pcr = None
first_pcr_pkt = 0
last_pcr = None
last_pcr_pkt = 0
print(f"MPEG-2 Transport Stream Analysis")
print(f"{'=' * 60}")
if args.input != '-':
file_size = os.path.getsize(args.input)
print(f"File: {args.input} ({file_size:,} bytes)")
print()
try:
for pkt in reader.iter_packets(max_packets=args.max_packets):
total_packets += 1
pid = pkt.pid
# PID counting
pid_counts[pid] = pid_counts.get(pid, 0) + 1
# TEI
if pkt.tei:
tei_count += 1
# Scrambling
if pkt.scrambling != 0:
scrambled_count += 1
# Continuity counter check (only for PIDs carrying payload)
if pkt.adaptation & 0x01 and pid != 0x1FFF:
if pid in pid_cc:
expected = (pid_cc[pid] + 1) & 0x0F
if pkt.continuity != expected and pkt.continuity != pid_cc[pid]:
cc_errors[pid] = cc_errors.get(pid, 0) + 1
pid_cc[pid] = pkt.continuity
# PCR extraction for bitrate calculation
if pkt.has_pcr():
pcr = pkt.get_pcr()
if pcr >= 0:
if first_pcr is None:
first_pcr = pcr
first_pcr_pkt = total_packets
last_pcr = pcr
last_pcr_pkt = total_packets
except KeyboardInterrupt:
print("\n (interrupted)")
finally:
if source is not sys.stdin.buffer:
source.close()
if total_packets == 0:
print("No valid TS packets found.")
return
# Summary
if reader.sync_offset > 0:
print(f"Sync offset: {reader.sync_offset} bytes (skipped leading garbage)")
print(f"Total packets: {total_packets:,}")
print(f"Total bytes: {total_packets * TS_PACKET_SIZE:,}")
print(f"Unique PIDs: {len(pid_counts)}")
print(f"TEI errors: {tei_count}")
print(f"Scrambled: {scrambled_count}")
# Bitrate
if first_pcr is not None and last_pcr is not None and last_pcr != first_pcr:
pcr_delta = last_pcr - first_pcr
pkt_delta = last_pcr_pkt - first_pcr_pkt
if pcr_delta > 0 and pkt_delta > 0:
duration = pcr_delta / 27_000_000.0 # PCR is 27 MHz clock
byte_count = pkt_delta * TS_PACKET_SIZE
bitrate = (byte_count * 8) / duration
if bitrate >= 1e6:
rate_str = f"{bitrate / 1e6:.2f} Mbps"
else:
rate_str = f"{bitrate / 1e3:.1f} kbps"
print(f"Duration: {duration:.2f}s (from PCR)")
print(f"Bitrate: {rate_str} (PCR-based)")
elif args.input != '-':
# File size estimate
file_size = os.path.getsize(args.input)
print(f"Bitrate: (no PCR found, cannot calculate from timing)")
# PID table
print(f"\n{'=' * 60}")
print(f"PID Distribution")
print(f"{'=' * 60}")
print(f" {'PID':>6} {'Count':>10} {'%':>7} {'CC Err':>6} Name")
print(f" {'---':>6} {'-----':>10} {'--':>7} {'------':>6} ----")
for pid in sorted(pid_counts.keys()):
count = pid_counts[pid]
pct = (count / total_packets) * 100
cc_err = cc_errors.get(pid, 0)
name = KNOWN_PIDS.get(pid, "")
cc_str = str(cc_err) if cc_err > 0 else "-"
print(f" 0x{pid:04X} {count:>10,} {pct:>6.2f}% {cc_str:>6} {name}")
# CC error summary
total_cc_errors = sum(cc_errors.values())
if total_cc_errors > 0:
print(f"\nContinuity errors: {total_cc_errors} total across "
f"{len(cc_errors)} PID(s)")
def cmd_pids(args: argparse.Namespace) -> None:
"""Quick PID summary table."""
source = open_input(args.input)
reader = TSReader(source, verbose=args.verbose)
pid_counts = {}
total = 0
try:
for pkt in reader.iter_packets(max_packets=args.max_packets):
total += 1
pid_counts[pkt.pid] = pid_counts.get(pkt.pid, 0) + 1
except KeyboardInterrupt:
pass
finally:
if source is not sys.stdin.buffer:
source.close()
if total == 0:
print("No TS packets found.")
return
print(f"PID Table ({total:,} packets)")
print(f"{'=' * 50}")
print(f" {'PID':>6} {'Count':>10} {'%':>7} Name")
print(f" {'---':>6} {'-----':>10} {'--':>7} ----")
for pid in sorted(pid_counts.keys()):
count = pid_counts[pid]
pct = (count / total) * 100
name = KNOWN_PIDS.get(pid, "")
print(f" 0x{pid:04X} {count:>10,} {pct:>6.2f}% {name}")
def cmd_pat(args: argparse.Namespace) -> None:
"""Parse and display the Program Association Table."""
source = open_input(args.input)
reader = TSReader(source, verbose=args.verbose)
psi = PSIParser()
pat_found = False
try:
for pkt in reader.iter_packets():
if pkt.pid != 0x0000:
continue
section = psi.feed(pkt)
if section is None:
continue
pat = parse_pat(section)
if pat is None:
continue
pat_found = True
print(f"Program Association Table (PAT)")
print(f"{'=' * 50}")
print(f" Transport Stream ID: 0x{pat['transport_stream_id']:04X} "
f"({pat['transport_stream_id']})")
print(f" Version: {pat['version']}")
print(f" Programs: {len(pat['programs'])}")
print()
print(f" {'Program':>10} {'PMT PID':>10} Note")
print(f" {'-------':>10} {'-------':>10} ----")
for prog, pmt_pid in sorted(pat['programs'].items()):
note = "NIT" if prog == 0 else ""
print(f" {prog:>10} 0x{pmt_pid:04X} {note}")
break
except KeyboardInterrupt:
pass
finally:
if source is not sys.stdin.buffer:
source.close()
if not pat_found:
print("No PAT (PID 0x0000) found in stream.")
def cmd_pmt(args: argparse.Namespace) -> None:
"""Parse and display Program Map Tables."""
source = open_input(args.input)
reader = TSReader(source, verbose=args.verbose)
psi_pat = PSIParser()
psi_pmt = PSIParser()
pat = None
pmt_pids = set()
pmts_found = {}
try:
for pkt in reader.iter_packets():
# First, collect PAT to learn PMT PIDs
if pkt.pid == 0x0000 and pat is None:
section = psi_pat.feed(pkt)
if section is not None:
pat = parse_pat(section)
if pat is not None:
for prog, pid in pat['programs'].items():
if prog != 0: # Skip NIT reference
pmt_pids.add(pid)
# Then collect PMT sections
if pkt.pid in pmt_pids and pkt.pid not in pmts_found:
section = psi_pmt.feed(pkt)
if section is not None:
pmt = parse_pmt(section)
if pmt is not None:
pmts_found[pkt.pid] = pmt
# Done when we have all PMTs
if pat is not None and len(pmts_found) >= len(pmt_pids):
break
except KeyboardInterrupt:
pass
finally:
if source is not sys.stdin.buffer:
source.close()
if pat is None:
print("No PAT found -- cannot locate PMTs.")
return
if not pmts_found:
print("PAT found but no PMT sections could be parsed.")
return
print(f"Program Map Tables")
print(f"{'=' * 60}")
print(f"Transport Stream ID: 0x{pat['transport_stream_id']:04X}")
print()
for pmt_pid in sorted(pmts_found.keys()):
pmt = pmts_found[pmt_pid]
print(f" Program {pmt['program_number']} (PMT PID 0x{pmt_pid:04X}, "
f"version {pmt['version']})")
print(f" PCR PID: 0x{pmt['pcr_pid']:04X}")
print(f" Streams:")
print(f" {'Type':>6} {'PID':>6} Description")
print(f" {'----':>6} {'---':>6} -----------")
for s in pmt['streams']:
print(f" 0x{s['stream_type']:02X} 0x{s['elementary_pid']:04X} "
f"{s['type_name']}")
print()
def cmd_dump(args: argparse.Namespace) -> None:
"""Hex dump of individual TS packets."""
source = open_input(args.input)
reader = TSReader(source, verbose=args.verbose)
filter_pid = args.pid
max_count = args.count
shown = 0
try:
for pkt in reader.iter_packets():
if filter_pid is not None and pkt.pid != filter_pid:
continue
shown += 1
scrambling_str = ["none", "reserved", "even key", "odd key"][pkt.scrambling]
adapt_str = ["reserved", "payload only", "adapt only", "adapt+payload"][pkt.adaptation]
print(f"Packet #{shown}")
print(f" PID: 0x{pkt.pid:04X} ({KNOWN_PIDS.get(pkt.pid, '')})")
print(f" TEI: {int(pkt.tei)} PUSI: {int(pkt.pusi)} "
f"Priority: {int(pkt.priority)}")
print(f" Scrambling: {scrambling_str} "
f"Adaptation: {adapt_str} CC: {pkt.continuity}")
if pkt.adaptation_field is not None and len(pkt.adaptation_field) > 1:
af_len = pkt.adaptation_field[0]
af_flags = pkt.adaptation_field[1] if len(pkt.adaptation_field) > 1 else 0
flags = []
if af_flags & 0x80: flags.append("discontinuity")
if af_flags & 0x40: flags.append("random_access")
if af_flags & 0x20: flags.append("ES_priority")
if af_flags & 0x10: flags.append("PCR")
if af_flags & 0x08: flags.append("OPCR")
if af_flags & 0x04: flags.append("splice_point")
if af_flags & 0x02: flags.append("private_data")
if af_flags & 0x01: flags.append("extension")
print(f" Adaptation field: {af_len} bytes, "
f"flags=[{', '.join(flags) if flags else 'none'}]")
if pkt.has_pcr():
pcr = pkt.get_pcr()
pcr_secs = pcr / 27_000_000.0
print(f" PCR: {pcr} ({pcr_secs:.6f}s)")
# Hex dump
data = pkt.raw
print(f" Hex:")
for row_off in range(0, len(data), 16):
row = data[row_off:row_off + 16]
hex_part = ' '.join(f'{b:02X}' for b in row)
ascii_part = ''.join(chr(b) if 0x20 <= b < 0x7F else '.' for b in row)
print(f" {row_off:04X}: {hex_part:<48} {ascii_part}")
print()
if max_count and shown >= max_count:
break
except KeyboardInterrupt:
pass
finally:
if source is not sys.stdin.buffer:
source.close()
if shown == 0:
if filter_pid is not None:
print(f"No packets found with PID 0x{filter_pid:04X}")
else:
print("No TS packets found.")
def cmd_monitor(args: argparse.Namespace) -> None:
"""Live stream monitoring from stdin or file."""
source = open_input(args.input)
reader = TSReader(source, verbose=args.verbose)
pid_counts = {}
known_pids = set()
cc_last = {}
cc_errors = 0
tei_count = 0
total_packets = 0
interval_packets = 0
start_time = time.time()
last_report = start_time
print(f"MPEG-2 TS Live Monitor")
print(f"{'=' * 60}")
print(f"Ctrl-C to stop\n")
try:
for pkt in reader.iter_packets():
total_packets += 1
interval_packets += 1
pid = pkt.pid
pid_counts[pid] = pid_counts.get(pid, 0) + 1
# New PID detection
if pid not in known_pids:
known_pids.add(pid)
name = KNOWN_PIDS.get(pid, "")
label = f" ({name})" if name else ""
elapsed = time.time() - start_time
print(f" [{elapsed:>7.1f}s] New PID: 0x{pid:04X}{label}")
# TEI
if pkt.tei:
tei_count += 1
elapsed = time.time() - start_time
print(f" [{elapsed:>7.1f}s] TEI error on PID 0x{pid:04X}")
# CC check
if pkt.adaptation & 0x01 and pid != 0x1FFF:
if pid in cc_last:
expected = (cc_last[pid] + 1) & 0x0F
if pkt.continuity != expected and pkt.continuity != cc_last[pid]:
cc_errors += 1
elapsed = time.time() - start_time
print(f" [{elapsed:>7.1f}s] CC error PID 0x{pid:04X}: "
f"expected {expected}, got {pkt.continuity}")
cc_last[pid] = pkt.continuity
# Periodic status
now = time.time()
if now - last_report >= 1.0:
elapsed = now - start_time
total_bytes = total_packets * TS_PACKET_SIZE
bitrate = (interval_packets * TS_PACKET_SIZE * 8)
if bitrate >= 1e6:
rate_str = f"{bitrate / 1e6:.2f} Mbps"
else:
rate_str = f"{bitrate / 1e3:.1f} kbps"
sys.stderr.write(
f"\r {total_packets:>10,} pkts "
f"{total_bytes:>12,} bytes "
f"{rate_str:>12} "
f"PIDs:{len(known_pids):>3} "
f"CCerr:{cc_errors} "
f"TEI:{tei_count} "
f"({elapsed:.0f}s) "
)
sys.stderr.flush()
interval_packets = 0
last_report = now
except KeyboardInterrupt:
pass
finally:
if source is not sys.stdin.buffer:
source.close()
elapsed = time.time() - start_time
total_bytes = total_packets * TS_PACKET_SIZE
print(f"\n\nMonitor Summary")
print(f"{'=' * 40}")
print(f" Duration: {elapsed:.1f}s")
print(f" Packets: {total_packets:,}")
print(f" Bytes: {total_bytes:,}")
print(f" Unique PIDs: {len(known_pids)}")
print(f" CC errors: {cc_errors}")
print(f" TEI errors: {tei_count}")
if elapsed > 0:
avg_bitrate = (total_bytes * 8) / elapsed
if avg_bitrate >= 1e6:
print(f" Avg bitrate: {avg_bitrate / 1e6:.2f} Mbps")
else:
print(f" Avg bitrate: {avg_bitrate / 1e3:.1f} kbps")
# -- CLI --
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="MPEG-2 Transport Stream analyzer for Genpix SkyWalker-1",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""\
examples:
%(prog)s capture.ts
%(prog)s analyze capture.ts
%(prog)s pids capture.ts
%(prog)s pat capture.ts
%(prog)s pmt capture.ts
%(prog)s dump capture.ts --pid 0x100 --count 5
%(prog)s monitor -
tune.py stream --stdout | %(prog)s monitor -
""")
parser.add_argument('-v', '--verbose', action='store_true',
help="Show sync search details and debug info")
sub = parser.add_subparsers(dest='command')
# analyze (default)
p_analyze = sub.add_parser('analyze', help="Full stream analysis (default)")
p_analyze.add_argument('input', help="TS file path or '-' for stdin")
p_analyze.add_argument('--max-packets', type=int, default=0,
help="Max packets to analyze (0 = all)")
# pids
p_pids = sub.add_parser('pids', help="Quick PID summary table")
p_pids.add_argument('input', help="TS file path or '-' for stdin")
p_pids.add_argument('--max-packets', type=int, default=0,
help="Max packets to analyze (0 = all)")
# pat
p_pat = sub.add_parser('pat', help="Parse Program Association Table")
p_pat.add_argument('input', help="TS file path or '-' for stdin")
# pmt
p_pmt = sub.add_parser('pmt', help="Parse Program Map Tables")
p_pmt.add_argument('input', help="TS file path or '-' for stdin")
# dump
p_dump = sub.add_parser('dump', help="Hex dump of TS packets")
p_dump.add_argument('input', help="TS file path or '-' for stdin")
p_dump.add_argument('--pid', type=lambda x: int(x, 0), default=None,
help="Filter by PID (hex: 0x100 or decimal: 256)")
p_dump.add_argument('--count', type=int, default=10,
help="Max packets to dump (default: 10)")
# monitor
p_monitor = sub.add_parser('monitor', help="Live stream monitoring")
p_monitor.add_argument('input', help="TS file path or '-' for stdin")
return parser
def main():
parser = build_parser()
# Handle bare filename without subcommand: default to 'analyze'
# Insert 'analyze' after any global flags but before the filename
subcmds = {'analyze', 'pids', 'pat', 'pmt', 'dump', 'monitor'}
argv = sys.argv[1:]
if argv:
first_pos = None
insert_idx = 0
for i, a in enumerate(argv):
if not a.startswith('-'):
first_pos = a
insert_idx = i
break
# Skip flag and its value if it takes one (currently none do)
insert_idx = i + 1
if first_pos is not None and first_pos not in subcmds:
argv.insert(insert_idx, 'analyze')
args = parser.parse_args(argv)
if not args.command:
parser.print_help()
sys.exit(1)
dispatch = {
'analyze': cmd_analyze,
'pids': cmd_pids,
'pat': cmd_pat,
'pmt': cmd_pmt,
'dump': cmd_dump,
'monitor': cmd_monitor,
}
handler = dispatch.get(args.command)
if handler is None:
parser.print_help()
sys.exit(1)
handler(args)
if __name__ == '__main__':
main()

View File

@ -751,7 +751,9 @@ examples:
sub = parser.add_subparsers(dest='command') sub = parser.add_subparsers(dest='command')
# status # status
sub.add_parser('status', help="Show device config, FW version, signal status") p_status = sub.add_parser('status', help="Show device config, FW version, signal status")
p_status.add_argument('--json', action='store_true', default=False,
help="Output machine-readable JSON")
# tune # tune
p_tune = sub.add_parser('tune', help="Tune to a transponder") p_tune = sub.add_parser('tune', help="Tune to a transponder")
@ -774,6 +776,8 @@ examples:
help="Signal lock timeout in seconds (default: 10)") help="Signal lock timeout in seconds (default: 10)")
p_tune.add_argument('--extra-volt', action='store_true', p_tune.add_argument('--extra-volt', action='store_true',
help="Enable +1V LNB voltage boost for long cables") help="Enable +1V LNB voltage boost for long cables")
p_tune.add_argument('--json', action='store_true', default=False,
help="Output machine-readable JSON")
# stream # stream
p_stream = sub.add_parser('stream', help="Stream MPEG-2 TS data") p_stream = sub.add_parser('stream', help="Stream MPEG-2 TS data")