New tools: - tools/eeprom_write.py: EEPROM firmware flash with backup, verify, dry-run - tools/ts_analyze.py: MPEG-2 transport stream analyzer with PAT/PMT parsing DVB-S2 investigation confirms BCM4500 hardware limitation (no LDPC/BCH silicon). Fix --json flag on tune.py subcommands (argparse parent/child scoping). All tools verified against live SkyWalker-1 hardware.
898 lines
30 KiB
Python
Executable File
898 lines
30 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Genpix SkyWalker-1 MPEG-2 Transport Stream analyzer.
|
|
|
|
Parses and analyzes 188-byte MPEG-2 TS packets from .ts files captured
|
|
by tune.py, stdin pipes, or any standard transport stream source.
|
|
|
|
Supports PID analysis, PAT/PMT parsing, continuity counter checking,
|
|
scrambling detection, hex packet dumps, and live stream monitoring.
|
|
|
|
Reference: ISO/IEC 13818-1 (MPEG-2 Systems)
|
|
TS packet: 188 bytes, sync byte 0x47
|
|
"""
|
|
|
|
import sys
|
|
import struct
|
|
import argparse
|
|
import time
|
|
import os
|
|
|
|
TS_PACKET_SIZE = 188
|
|
TS_SYNC_BYTE = 0x47
|
|
|
|
# Well-known PID assignments (ISO 13818-1 Table 2-3)
|
|
KNOWN_PIDS = {
|
|
0x0000: "PAT",
|
|
0x0001: "CAT",
|
|
0x0002: "TSDT",
|
|
0x0010: "NIT/ST",
|
|
0x0011: "SDT/BAT/ST",
|
|
0x0012: "EIT/ST",
|
|
0x0013: "RST/ST",
|
|
0x0014: "TDT/TOT/ST",
|
|
0x001E: "DIT",
|
|
0x001F: "SIT",
|
|
0x1FFF: "Null",
|
|
}
|
|
|
|
# Stream type identifiers (ISO 13818-1 Table 2-36)
|
|
STREAM_TYPES = {
|
|
0x00: "Reserved",
|
|
0x01: "MPEG-1 Video (11172-2)",
|
|
0x02: "MPEG-2 Video (13818-2)",
|
|
0x03: "MPEG-1 Audio (11172-3)",
|
|
0x04: "MPEG-2 Audio (13818-3)",
|
|
0x05: "Private Sections (13818-1)",
|
|
0x06: "PES Private Data",
|
|
0x07: "MHEG",
|
|
0x08: "DSM-CC",
|
|
0x09: "H.222.1",
|
|
0x0A: "DSM-CC Type A",
|
|
0x0B: "DSM-CC Type B",
|
|
0x0C: "DSM-CC Type C",
|
|
0x0D: "DSM-CC Type D",
|
|
0x0E: "Auxiliary",
|
|
0x0F: "MPEG-2 AAC Audio",
|
|
0x10: "MPEG-4 Visual",
|
|
0x11: "MPEG-4 AAC Audio (LATM)",
|
|
0x15: "Metadata in PES",
|
|
0x1B: "H.264/AVC Video",
|
|
0x24: "H.265/HEVC Video",
|
|
0x42: "AVS Video",
|
|
0x81: "AC-3 Audio (ATSC)",
|
|
0x82: "DTS Audio",
|
|
0x83: "Dolby TrueHD",
|
|
0x84: "Dolby Digital Plus (EAC-3)",
|
|
0x85: "DTS-HD",
|
|
0x86: "DTS-HD Master Audio",
|
|
0x87: "EAC-3 Audio (ATSC)",
|
|
0xEA: "VC-1 Video",
|
|
}
|
|
|
|
|
|
class TSPacket:
|
|
"""Parsed MPEG-2 transport stream packet header."""
|
|
|
|
__slots__ = (
|
|
'sync', 'tei', 'pusi', 'priority', 'pid',
|
|
'scrambling', 'adaptation', 'continuity',
|
|
'adaptation_field', 'payload', 'raw',
|
|
)
|
|
|
|
def __init__(self, data: bytes):
|
|
if len(data) != TS_PACKET_SIZE:
|
|
raise ValueError(f"Packet must be {TS_PACKET_SIZE} bytes, got {len(data)}")
|
|
|
|
self.raw = data
|
|
self.sync = data[0]
|
|
self.tei = bool(data[1] & 0x80)
|
|
self.pusi = bool(data[1] & 0x40)
|
|
self.priority = bool(data[1] & 0x20)
|
|
self.pid = ((data[1] & 0x1F) << 8) | data[2]
|
|
self.scrambling = (data[3] >> 6) & 0x03
|
|
self.adaptation = (data[3] >> 4) & 0x03
|
|
self.continuity = data[3] & 0x0F
|
|
|
|
# Parse adaptation field and payload boundaries
|
|
offset = 4
|
|
self.adaptation_field = None
|
|
self.payload = None
|
|
|
|
if self.adaptation & 0x02:
|
|
# Adaptation field present
|
|
if offset < TS_PACKET_SIZE:
|
|
af_len = data[offset]
|
|
af_end = offset + 1 + af_len
|
|
if af_end <= TS_PACKET_SIZE:
|
|
self.adaptation_field = data[offset:af_end]
|
|
offset = af_end
|
|
|
|
if self.adaptation & 0x01:
|
|
# Payload present
|
|
if offset < TS_PACKET_SIZE:
|
|
self.payload = data[offset:]
|
|
|
|
def has_pcr(self) -> bool:
|
|
"""Check if adaptation field contains a PCR."""
|
|
if self.adaptation_field is None or len(self.adaptation_field) < 7:
|
|
return False
|
|
af_flags = self.adaptation_field[1] if len(self.adaptation_field) > 1 else 0
|
|
return bool(af_flags & 0x10)
|
|
|
|
def get_pcr(self) -> int:
|
|
"""Extract PCR value (in 27 MHz clock ticks). Returns -1 if no PCR."""
|
|
if not self.has_pcr():
|
|
return -1
|
|
# PCR is 6 bytes starting at adaptation_field[2]
|
|
af = self.adaptation_field
|
|
pcr_base = (af[2] << 25) | (af[3] << 17) | (af[4] << 9) | \
|
|
(af[5] << 1) | ((af[6] >> 7) & 0x01)
|
|
pcr_ext = ((af[6] & 0x01) << 8) | af[7]
|
|
return pcr_base * 300 + pcr_ext
|
|
|
|
|
|
class TSReader:
|
|
"""Reads TS packets from a file or stream, handling sync alignment."""
|
|
|
|
def __init__(self, source, verbose: bool = False):
|
|
self.source = source
|
|
self.verbose = verbose
|
|
self.offset = 0
|
|
self._sync_offset = -1
|
|
|
|
def find_sync(self, data: bytes) -> int:
|
|
"""Find sync byte alignment in raw data. Returns byte offset or -1."""
|
|
# Need at least 3 consecutive sync bytes to confirm alignment
|
|
for i in range(min(len(data), TS_PACKET_SIZE)):
|
|
if data[i] != TS_SYNC_BYTE:
|
|
continue
|
|
# Check for consecutive sync bytes at 188-byte intervals
|
|
ok = True
|
|
for check in range(1, 4):
|
|
pos = i + check * TS_PACKET_SIZE
|
|
if pos >= len(data):
|
|
# Not enough data to confirm, accept if at least one more matches
|
|
if check >= 2:
|
|
break
|
|
ok = False
|
|
break
|
|
if data[pos] != TS_SYNC_BYTE:
|
|
ok = False
|
|
break
|
|
if ok:
|
|
return i
|
|
return -1
|
|
|
|
def iter_packets(self, max_packets: int = 0):
|
|
"""Yield TSPacket objects from the source."""
|
|
buf = b''
|
|
synced = False
|
|
count = 0
|
|
|
|
while True:
|
|
chunk = self.source.read(65536)
|
|
if not chunk:
|
|
break
|
|
buf += chunk
|
|
|
|
if not synced:
|
|
sync_off = self.find_sync(buf)
|
|
if sync_off < 0:
|
|
# Keep last 187 bytes in case sync straddles chunk boundary
|
|
if len(buf) > TS_PACKET_SIZE * 4:
|
|
buf = buf[-(TS_PACKET_SIZE - 1):]
|
|
continue
|
|
self._sync_offset = sync_off + self.offset
|
|
if self.verbose and sync_off > 0:
|
|
print(f" Sync found at byte offset {sync_off}", file=sys.stderr)
|
|
buf = buf[sync_off:]
|
|
synced = True
|
|
|
|
while len(buf) >= TS_PACKET_SIZE:
|
|
pkt_data = buf[:TS_PACKET_SIZE]
|
|
buf = buf[TS_PACKET_SIZE:]
|
|
|
|
if pkt_data[0] != TS_SYNC_BYTE:
|
|
# Lost sync, try to re-acquire
|
|
synced = False
|
|
if self.verbose:
|
|
print(f" Sync lost, re-scanning...", file=sys.stderr)
|
|
break
|
|
|
|
count += 1
|
|
yield TSPacket(pkt_data)
|
|
|
|
if max_packets and count >= max_packets:
|
|
return
|
|
|
|
self.offset += len(chunk)
|
|
|
|
@property
|
|
def sync_offset(self) -> int:
|
|
return self._sync_offset
|
|
|
|
|
|
class PSIParser:
|
|
"""Parse PSI sections from TS packet payloads."""
|
|
|
|
def __init__(self):
|
|
self._section_bufs = {} # pid -> accumulated bytes
|
|
|
|
def feed(self, pkt: TSPacket) -> dict:
|
|
"""Feed a packet, return parsed section dict or None."""
|
|
if pkt.payload is None:
|
|
return None
|
|
|
|
pid = pkt.pid
|
|
payload = pkt.payload
|
|
|
|
if pkt.pusi:
|
|
# Payload Unit Start Indicator set
|
|
if len(payload) < 1:
|
|
return None
|
|
pointer = payload[0]
|
|
payload = payload[1 + pointer:]
|
|
self._section_bufs[pid] = payload
|
|
elif pid in self._section_bufs:
|
|
self._section_bufs[pid] += payload
|
|
else:
|
|
return None
|
|
|
|
return self._try_parse(pid)
|
|
|
|
def _try_parse(self, pid: int) -> dict:
|
|
"""Try to parse a complete section from the buffer."""
|
|
buf = self._section_bufs.get(pid, b'')
|
|
if len(buf) < 3:
|
|
return None
|
|
|
|
table_id = buf[0]
|
|
section_length = ((buf[1] & 0x0F) << 8) | buf[2]
|
|
total_len = 3 + section_length
|
|
|
|
if len(buf) < total_len:
|
|
return None # Incomplete, wait for more data
|
|
|
|
section = buf[:total_len]
|
|
# Clear buffer for next section
|
|
self._section_bufs[pid] = buf[total_len:]
|
|
|
|
if section_length < 5:
|
|
return None
|
|
|
|
result = {
|
|
"table_id": table_id,
|
|
"section_syntax": bool(buf[1] & 0x80),
|
|
"section_length": section_length,
|
|
"raw": section,
|
|
}
|
|
|
|
if result["section_syntax"]:
|
|
result["table_id_ext"] = (section[3] << 8) | section[4]
|
|
result["version"] = (section[5] >> 1) & 0x1F
|
|
result["current_next"] = section[5] & 0x01
|
|
result["section_number"] = section[6]
|
|
result["last_section_number"] = section[7]
|
|
result["data"] = section[8:-4]
|
|
result["crc32"] = struct.unpack_from('>I', section, total_len - 4)[0]
|
|
|
|
return result
|
|
|
|
|
|
def parse_pat(section: dict) -> dict:
|
|
"""Parse a Program Association Table section."""
|
|
if section is None or section["table_id"] != 0x00:
|
|
return None
|
|
|
|
transport_stream_id = section["table_id_ext"]
|
|
data = section["data"]
|
|
programs = {}
|
|
|
|
for i in range(0, len(data), 4):
|
|
if i + 4 > len(data):
|
|
break
|
|
prog_num = (data[i] << 8) | data[i + 1]
|
|
pmt_pid = ((data[i + 2] & 0x1F) << 8) | data[i + 3]
|
|
programs[prog_num] = pmt_pid
|
|
|
|
return {
|
|
"transport_stream_id": transport_stream_id,
|
|
"version": section["version"],
|
|
"programs": programs,
|
|
}
|
|
|
|
|
|
def parse_pmt(section: dict) -> dict:
|
|
"""Parse a Program Map Table section."""
|
|
if section is None or section["table_id"] != 0x02:
|
|
return None
|
|
|
|
program_number = section["table_id_ext"]
|
|
data = section["raw"]
|
|
|
|
if len(data) < 12:
|
|
return None
|
|
|
|
pcr_pid = ((data[8] & 0x1F) << 8) | data[9]
|
|
prog_info_len = ((data[10] & 0x0F) << 8) | data[11]
|
|
|
|
offset = 12 + prog_info_len
|
|
streams = []
|
|
|
|
while offset + 5 <= len(data) - 4: # -4 for CRC
|
|
stream_type = data[offset]
|
|
elementary_pid = ((data[offset + 1] & 0x1F) << 8) | data[offset + 2]
|
|
es_info_len = ((data[offset + 3] & 0x0F) << 8) | data[offset + 4]
|
|
|
|
streams.append({
|
|
"stream_type": stream_type,
|
|
"elementary_pid": elementary_pid,
|
|
"es_info_length": es_info_len,
|
|
"type_name": STREAM_TYPES.get(stream_type, f"Unknown (0x{stream_type:02X})"),
|
|
})
|
|
offset += 5 + es_info_len
|
|
|
|
return {
|
|
"program_number": program_number,
|
|
"version": section["version"],
|
|
"pcr_pid": pcr_pid,
|
|
"streams": streams,
|
|
}
|
|
|
|
|
|
def open_input(path: str):
|
|
"""Open TS input from a file path or stdin ('-')."""
|
|
if path == '-':
|
|
return sys.stdin.buffer
|
|
if not os.path.exists(path):
|
|
print(f"File not found: {path}")
|
|
sys.exit(1)
|
|
return open(path, 'rb')
|
|
|
|
|
|
def format_pid(pid: int, known: dict = None) -> str:
|
|
"""Format a PID with its known name if available."""
|
|
if known is None:
|
|
known = KNOWN_PIDS
|
|
name = known.get(pid, "")
|
|
if name:
|
|
return f"0x{pid:04X} ({name})"
|
|
return f"0x{pid:04X}"
|
|
|
|
|
|
# -- Subcommand handlers --
|
|
|
|
def cmd_analyze(args: argparse.Namespace) -> None:
|
|
"""Full transport stream analysis."""
|
|
source = open_input(args.input)
|
|
reader = TSReader(source, verbose=args.verbose)
|
|
|
|
pid_counts = {}
|
|
pid_cc = {} # Last continuity counter per PID
|
|
cc_errors = {}
|
|
tei_count = 0
|
|
scrambled_count = 0
|
|
total_packets = 0
|
|
first_pcr = None
|
|
first_pcr_pkt = 0
|
|
last_pcr = None
|
|
last_pcr_pkt = 0
|
|
|
|
print(f"MPEG-2 Transport Stream Analysis")
|
|
print(f"{'=' * 60}")
|
|
if args.input != '-':
|
|
file_size = os.path.getsize(args.input)
|
|
print(f"File: {args.input} ({file_size:,} bytes)")
|
|
print()
|
|
|
|
try:
|
|
for pkt in reader.iter_packets(max_packets=args.max_packets):
|
|
total_packets += 1
|
|
pid = pkt.pid
|
|
|
|
# PID counting
|
|
pid_counts[pid] = pid_counts.get(pid, 0) + 1
|
|
|
|
# TEI
|
|
if pkt.tei:
|
|
tei_count += 1
|
|
|
|
# Scrambling
|
|
if pkt.scrambling != 0:
|
|
scrambled_count += 1
|
|
|
|
# Continuity counter check (only for PIDs carrying payload)
|
|
if pkt.adaptation & 0x01 and pid != 0x1FFF:
|
|
if pid in pid_cc:
|
|
expected = (pid_cc[pid] + 1) & 0x0F
|
|
if pkt.continuity != expected and pkt.continuity != pid_cc[pid]:
|
|
cc_errors[pid] = cc_errors.get(pid, 0) + 1
|
|
pid_cc[pid] = pkt.continuity
|
|
|
|
# PCR extraction for bitrate calculation
|
|
if pkt.has_pcr():
|
|
pcr = pkt.get_pcr()
|
|
if pcr >= 0:
|
|
if first_pcr is None:
|
|
first_pcr = pcr
|
|
first_pcr_pkt = total_packets
|
|
last_pcr = pcr
|
|
last_pcr_pkt = total_packets
|
|
except KeyboardInterrupt:
|
|
print("\n (interrupted)")
|
|
finally:
|
|
if source is not sys.stdin.buffer:
|
|
source.close()
|
|
|
|
if total_packets == 0:
|
|
print("No valid TS packets found.")
|
|
return
|
|
|
|
# Summary
|
|
if reader.sync_offset > 0:
|
|
print(f"Sync offset: {reader.sync_offset} bytes (skipped leading garbage)")
|
|
print(f"Total packets: {total_packets:,}")
|
|
print(f"Total bytes: {total_packets * TS_PACKET_SIZE:,}")
|
|
print(f"Unique PIDs: {len(pid_counts)}")
|
|
print(f"TEI errors: {tei_count}")
|
|
print(f"Scrambled: {scrambled_count}")
|
|
|
|
# Bitrate
|
|
if first_pcr is not None and last_pcr is not None and last_pcr != first_pcr:
|
|
pcr_delta = last_pcr - first_pcr
|
|
pkt_delta = last_pcr_pkt - first_pcr_pkt
|
|
if pcr_delta > 0 and pkt_delta > 0:
|
|
duration = pcr_delta / 27_000_000.0 # PCR is 27 MHz clock
|
|
byte_count = pkt_delta * TS_PACKET_SIZE
|
|
bitrate = (byte_count * 8) / duration
|
|
if bitrate >= 1e6:
|
|
rate_str = f"{bitrate / 1e6:.2f} Mbps"
|
|
else:
|
|
rate_str = f"{bitrate / 1e3:.1f} kbps"
|
|
print(f"Duration: {duration:.2f}s (from PCR)")
|
|
print(f"Bitrate: {rate_str} (PCR-based)")
|
|
elif args.input != '-':
|
|
# File size estimate
|
|
file_size = os.path.getsize(args.input)
|
|
print(f"Bitrate: (no PCR found, cannot calculate from timing)")
|
|
|
|
# PID table
|
|
print(f"\n{'=' * 60}")
|
|
print(f"PID Distribution")
|
|
print(f"{'=' * 60}")
|
|
print(f" {'PID':>6} {'Count':>10} {'%':>7} {'CC Err':>6} Name")
|
|
print(f" {'---':>6} {'-----':>10} {'--':>7} {'------':>6} ----")
|
|
|
|
for pid in sorted(pid_counts.keys()):
|
|
count = pid_counts[pid]
|
|
pct = (count / total_packets) * 100
|
|
cc_err = cc_errors.get(pid, 0)
|
|
name = KNOWN_PIDS.get(pid, "")
|
|
cc_str = str(cc_err) if cc_err > 0 else "-"
|
|
print(f" 0x{pid:04X} {count:>10,} {pct:>6.2f}% {cc_str:>6} {name}")
|
|
|
|
# CC error summary
|
|
total_cc_errors = sum(cc_errors.values())
|
|
if total_cc_errors > 0:
|
|
print(f"\nContinuity errors: {total_cc_errors} total across "
|
|
f"{len(cc_errors)} PID(s)")
|
|
|
|
|
|
def cmd_pids(args: argparse.Namespace) -> None:
|
|
"""Quick PID summary table."""
|
|
source = open_input(args.input)
|
|
reader = TSReader(source, verbose=args.verbose)
|
|
|
|
pid_counts = {}
|
|
total = 0
|
|
|
|
try:
|
|
for pkt in reader.iter_packets(max_packets=args.max_packets):
|
|
total += 1
|
|
pid_counts[pkt.pid] = pid_counts.get(pkt.pid, 0) + 1
|
|
except KeyboardInterrupt:
|
|
pass
|
|
finally:
|
|
if source is not sys.stdin.buffer:
|
|
source.close()
|
|
|
|
if total == 0:
|
|
print("No TS packets found.")
|
|
return
|
|
|
|
print(f"PID Table ({total:,} packets)")
|
|
print(f"{'=' * 50}")
|
|
print(f" {'PID':>6} {'Count':>10} {'%':>7} Name")
|
|
print(f" {'---':>6} {'-----':>10} {'--':>7} ----")
|
|
|
|
for pid in sorted(pid_counts.keys()):
|
|
count = pid_counts[pid]
|
|
pct = (count / total) * 100
|
|
name = KNOWN_PIDS.get(pid, "")
|
|
print(f" 0x{pid:04X} {count:>10,} {pct:>6.2f}% {name}")
|
|
|
|
|
|
def cmd_pat(args: argparse.Namespace) -> None:
|
|
"""Parse and display the Program Association Table."""
|
|
source = open_input(args.input)
|
|
reader = TSReader(source, verbose=args.verbose)
|
|
psi = PSIParser()
|
|
pat_found = False
|
|
|
|
try:
|
|
for pkt in reader.iter_packets():
|
|
if pkt.pid != 0x0000:
|
|
continue
|
|
section = psi.feed(pkt)
|
|
if section is None:
|
|
continue
|
|
pat = parse_pat(section)
|
|
if pat is None:
|
|
continue
|
|
|
|
pat_found = True
|
|
print(f"Program Association Table (PAT)")
|
|
print(f"{'=' * 50}")
|
|
print(f" Transport Stream ID: 0x{pat['transport_stream_id']:04X} "
|
|
f"({pat['transport_stream_id']})")
|
|
print(f" Version: {pat['version']}")
|
|
print(f" Programs: {len(pat['programs'])}")
|
|
print()
|
|
print(f" {'Program':>10} {'PMT PID':>10} Note")
|
|
print(f" {'-------':>10} {'-------':>10} ----")
|
|
for prog, pmt_pid in sorted(pat['programs'].items()):
|
|
note = "NIT" if prog == 0 else ""
|
|
print(f" {prog:>10} 0x{pmt_pid:04X} {note}")
|
|
break
|
|
|
|
except KeyboardInterrupt:
|
|
pass
|
|
finally:
|
|
if source is not sys.stdin.buffer:
|
|
source.close()
|
|
|
|
if not pat_found:
|
|
print("No PAT (PID 0x0000) found in stream.")
|
|
|
|
|
|
def cmd_pmt(args: argparse.Namespace) -> None:
|
|
"""Parse and display Program Map Tables."""
|
|
source = open_input(args.input)
|
|
reader = TSReader(source, verbose=args.verbose)
|
|
psi_pat = PSIParser()
|
|
psi_pmt = PSIParser()
|
|
|
|
pat = None
|
|
pmt_pids = set()
|
|
pmts_found = {}
|
|
|
|
try:
|
|
for pkt in reader.iter_packets():
|
|
# First, collect PAT to learn PMT PIDs
|
|
if pkt.pid == 0x0000 and pat is None:
|
|
section = psi_pat.feed(pkt)
|
|
if section is not None:
|
|
pat = parse_pat(section)
|
|
if pat is not None:
|
|
for prog, pid in pat['programs'].items():
|
|
if prog != 0: # Skip NIT reference
|
|
pmt_pids.add(pid)
|
|
|
|
# Then collect PMT sections
|
|
if pkt.pid in pmt_pids and pkt.pid not in pmts_found:
|
|
section = psi_pmt.feed(pkt)
|
|
if section is not None:
|
|
pmt = parse_pmt(section)
|
|
if pmt is not None:
|
|
pmts_found[pkt.pid] = pmt
|
|
|
|
# Done when we have all PMTs
|
|
if pat is not None and len(pmts_found) >= len(pmt_pids):
|
|
break
|
|
|
|
except KeyboardInterrupt:
|
|
pass
|
|
finally:
|
|
if source is not sys.stdin.buffer:
|
|
source.close()
|
|
|
|
if pat is None:
|
|
print("No PAT found -- cannot locate PMTs.")
|
|
return
|
|
|
|
if not pmts_found:
|
|
print("PAT found but no PMT sections could be parsed.")
|
|
return
|
|
|
|
print(f"Program Map Tables")
|
|
print(f"{'=' * 60}")
|
|
print(f"Transport Stream ID: 0x{pat['transport_stream_id']:04X}")
|
|
print()
|
|
|
|
for pmt_pid in sorted(pmts_found.keys()):
|
|
pmt = pmts_found[pmt_pid]
|
|
print(f" Program {pmt['program_number']} (PMT PID 0x{pmt_pid:04X}, "
|
|
f"version {pmt['version']})")
|
|
print(f" PCR PID: 0x{pmt['pcr_pid']:04X}")
|
|
print(f" Streams:")
|
|
print(f" {'Type':>6} {'PID':>6} Description")
|
|
print(f" {'----':>6} {'---':>6} -----------")
|
|
for s in pmt['streams']:
|
|
print(f" 0x{s['stream_type']:02X} 0x{s['elementary_pid']:04X} "
|
|
f"{s['type_name']}")
|
|
print()
|
|
|
|
|
|
def cmd_dump(args: argparse.Namespace) -> None:
|
|
"""Hex dump of individual TS packets."""
|
|
source = open_input(args.input)
|
|
reader = TSReader(source, verbose=args.verbose)
|
|
filter_pid = args.pid
|
|
max_count = args.count
|
|
shown = 0
|
|
|
|
try:
|
|
for pkt in reader.iter_packets():
|
|
if filter_pid is not None and pkt.pid != filter_pid:
|
|
continue
|
|
|
|
shown += 1
|
|
scrambling_str = ["none", "reserved", "even key", "odd key"][pkt.scrambling]
|
|
adapt_str = ["reserved", "payload only", "adapt only", "adapt+payload"][pkt.adaptation]
|
|
|
|
print(f"Packet #{shown}")
|
|
print(f" PID: 0x{pkt.pid:04X} ({KNOWN_PIDS.get(pkt.pid, '')})")
|
|
print(f" TEI: {int(pkt.tei)} PUSI: {int(pkt.pusi)} "
|
|
f"Priority: {int(pkt.priority)}")
|
|
print(f" Scrambling: {scrambling_str} "
|
|
f"Adaptation: {adapt_str} CC: {pkt.continuity}")
|
|
|
|
if pkt.adaptation_field is not None and len(pkt.adaptation_field) > 1:
|
|
af_len = pkt.adaptation_field[0]
|
|
af_flags = pkt.adaptation_field[1] if len(pkt.adaptation_field) > 1 else 0
|
|
flags = []
|
|
if af_flags & 0x80: flags.append("discontinuity")
|
|
if af_flags & 0x40: flags.append("random_access")
|
|
if af_flags & 0x20: flags.append("ES_priority")
|
|
if af_flags & 0x10: flags.append("PCR")
|
|
if af_flags & 0x08: flags.append("OPCR")
|
|
if af_flags & 0x04: flags.append("splice_point")
|
|
if af_flags & 0x02: flags.append("private_data")
|
|
if af_flags & 0x01: flags.append("extension")
|
|
print(f" Adaptation field: {af_len} bytes, "
|
|
f"flags=[{', '.join(flags) if flags else 'none'}]")
|
|
if pkt.has_pcr():
|
|
pcr = pkt.get_pcr()
|
|
pcr_secs = pcr / 27_000_000.0
|
|
print(f" PCR: {pcr} ({pcr_secs:.6f}s)")
|
|
|
|
# Hex dump
|
|
data = pkt.raw
|
|
print(f" Hex:")
|
|
for row_off in range(0, len(data), 16):
|
|
row = data[row_off:row_off + 16]
|
|
hex_part = ' '.join(f'{b:02X}' for b in row)
|
|
ascii_part = ''.join(chr(b) if 0x20 <= b < 0x7F else '.' for b in row)
|
|
print(f" {row_off:04X}: {hex_part:<48} {ascii_part}")
|
|
print()
|
|
|
|
if max_count and shown >= max_count:
|
|
break
|
|
|
|
except KeyboardInterrupt:
|
|
pass
|
|
finally:
|
|
if source is not sys.stdin.buffer:
|
|
source.close()
|
|
|
|
if shown == 0:
|
|
if filter_pid is not None:
|
|
print(f"No packets found with PID 0x{filter_pid:04X}")
|
|
else:
|
|
print("No TS packets found.")
|
|
|
|
|
|
def cmd_monitor(args: argparse.Namespace) -> None:
|
|
"""Live stream monitoring from stdin or file."""
|
|
source = open_input(args.input)
|
|
reader = TSReader(source, verbose=args.verbose)
|
|
|
|
pid_counts = {}
|
|
known_pids = set()
|
|
cc_last = {}
|
|
cc_errors = 0
|
|
tei_count = 0
|
|
total_packets = 0
|
|
interval_packets = 0
|
|
start_time = time.time()
|
|
last_report = start_time
|
|
|
|
print(f"MPEG-2 TS Live Monitor")
|
|
print(f"{'=' * 60}")
|
|
print(f"Ctrl-C to stop\n")
|
|
|
|
try:
|
|
for pkt in reader.iter_packets():
|
|
total_packets += 1
|
|
interval_packets += 1
|
|
pid = pkt.pid
|
|
|
|
pid_counts[pid] = pid_counts.get(pid, 0) + 1
|
|
|
|
# New PID detection
|
|
if pid not in known_pids:
|
|
known_pids.add(pid)
|
|
name = KNOWN_PIDS.get(pid, "")
|
|
label = f" ({name})" if name else ""
|
|
elapsed = time.time() - start_time
|
|
print(f" [{elapsed:>7.1f}s] New PID: 0x{pid:04X}{label}")
|
|
|
|
# TEI
|
|
if pkt.tei:
|
|
tei_count += 1
|
|
elapsed = time.time() - start_time
|
|
print(f" [{elapsed:>7.1f}s] TEI error on PID 0x{pid:04X}")
|
|
|
|
# CC check
|
|
if pkt.adaptation & 0x01 and pid != 0x1FFF:
|
|
if pid in cc_last:
|
|
expected = (cc_last[pid] + 1) & 0x0F
|
|
if pkt.continuity != expected and pkt.continuity != cc_last[pid]:
|
|
cc_errors += 1
|
|
elapsed = time.time() - start_time
|
|
print(f" [{elapsed:>7.1f}s] CC error PID 0x{pid:04X}: "
|
|
f"expected {expected}, got {pkt.continuity}")
|
|
cc_last[pid] = pkt.continuity
|
|
|
|
# Periodic status
|
|
now = time.time()
|
|
if now - last_report >= 1.0:
|
|
elapsed = now - start_time
|
|
total_bytes = total_packets * TS_PACKET_SIZE
|
|
bitrate = (interval_packets * TS_PACKET_SIZE * 8)
|
|
if bitrate >= 1e6:
|
|
rate_str = f"{bitrate / 1e6:.2f} Mbps"
|
|
else:
|
|
rate_str = f"{bitrate / 1e3:.1f} kbps"
|
|
sys.stderr.write(
|
|
f"\r {total_packets:>10,} pkts "
|
|
f"{total_bytes:>12,} bytes "
|
|
f"{rate_str:>12} "
|
|
f"PIDs:{len(known_pids):>3} "
|
|
f"CCerr:{cc_errors} "
|
|
f"TEI:{tei_count} "
|
|
f"({elapsed:.0f}s) "
|
|
)
|
|
sys.stderr.flush()
|
|
interval_packets = 0
|
|
last_report = now
|
|
|
|
except KeyboardInterrupt:
|
|
pass
|
|
finally:
|
|
if source is not sys.stdin.buffer:
|
|
source.close()
|
|
|
|
elapsed = time.time() - start_time
|
|
total_bytes = total_packets * TS_PACKET_SIZE
|
|
print(f"\n\nMonitor Summary")
|
|
print(f"{'=' * 40}")
|
|
print(f" Duration: {elapsed:.1f}s")
|
|
print(f" Packets: {total_packets:,}")
|
|
print(f" Bytes: {total_bytes:,}")
|
|
print(f" Unique PIDs: {len(known_pids)}")
|
|
print(f" CC errors: {cc_errors}")
|
|
print(f" TEI errors: {tei_count}")
|
|
|
|
if elapsed > 0:
|
|
avg_bitrate = (total_bytes * 8) / elapsed
|
|
if avg_bitrate >= 1e6:
|
|
print(f" Avg bitrate: {avg_bitrate / 1e6:.2f} Mbps")
|
|
else:
|
|
print(f" Avg bitrate: {avg_bitrate / 1e3:.1f} kbps")
|
|
|
|
|
|
# -- CLI --
|
|
|
|
def build_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(
|
|
description="MPEG-2 Transport Stream analyzer for Genpix SkyWalker-1",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""\
|
|
examples:
|
|
%(prog)s capture.ts
|
|
%(prog)s analyze capture.ts
|
|
%(prog)s pids capture.ts
|
|
%(prog)s pat capture.ts
|
|
%(prog)s pmt capture.ts
|
|
%(prog)s dump capture.ts --pid 0x100 --count 5
|
|
%(prog)s monitor -
|
|
tune.py stream --stdout | %(prog)s monitor -
|
|
""")
|
|
parser.add_argument('-v', '--verbose', action='store_true',
|
|
help="Show sync search details and debug info")
|
|
|
|
sub = parser.add_subparsers(dest='command')
|
|
|
|
# analyze (default)
|
|
p_analyze = sub.add_parser('analyze', help="Full stream analysis (default)")
|
|
p_analyze.add_argument('input', help="TS file path or '-' for stdin")
|
|
p_analyze.add_argument('--max-packets', type=int, default=0,
|
|
help="Max packets to analyze (0 = all)")
|
|
|
|
# pids
|
|
p_pids = sub.add_parser('pids', help="Quick PID summary table")
|
|
p_pids.add_argument('input', help="TS file path or '-' for stdin")
|
|
p_pids.add_argument('--max-packets', type=int, default=0,
|
|
help="Max packets to analyze (0 = all)")
|
|
|
|
# pat
|
|
p_pat = sub.add_parser('pat', help="Parse Program Association Table")
|
|
p_pat.add_argument('input', help="TS file path or '-' for stdin")
|
|
|
|
# pmt
|
|
p_pmt = sub.add_parser('pmt', help="Parse Program Map Tables")
|
|
p_pmt.add_argument('input', help="TS file path or '-' for stdin")
|
|
|
|
# dump
|
|
p_dump = sub.add_parser('dump', help="Hex dump of TS packets")
|
|
p_dump.add_argument('input', help="TS file path or '-' for stdin")
|
|
p_dump.add_argument('--pid', type=lambda x: int(x, 0), default=None,
|
|
help="Filter by PID (hex: 0x100 or decimal: 256)")
|
|
p_dump.add_argument('--count', type=int, default=10,
|
|
help="Max packets to dump (default: 10)")
|
|
|
|
# monitor
|
|
p_monitor = sub.add_parser('monitor', help="Live stream monitoring")
|
|
p_monitor.add_argument('input', help="TS file path or '-' for stdin")
|
|
|
|
return parser
|
|
|
|
|
|
def main():
|
|
parser = build_parser()
|
|
|
|
# Handle bare filename without subcommand: default to 'analyze'
|
|
# Insert 'analyze' after any global flags but before the filename
|
|
subcmds = {'analyze', 'pids', 'pat', 'pmt', 'dump', 'monitor'}
|
|
argv = sys.argv[1:]
|
|
if argv:
|
|
first_pos = None
|
|
insert_idx = 0
|
|
for i, a in enumerate(argv):
|
|
if not a.startswith('-'):
|
|
first_pos = a
|
|
insert_idx = i
|
|
break
|
|
# Skip flag and its value if it takes one (currently none do)
|
|
insert_idx = i + 1
|
|
if first_pos is not None and first_pos not in subcmds:
|
|
argv.insert(insert_idx, 'analyze')
|
|
|
|
args = parser.parse_args(argv)
|
|
|
|
if not args.command:
|
|
parser.print_help()
|
|
sys.exit(1)
|
|
|
|
dispatch = {
|
|
'analyze': cmd_analyze,
|
|
'pids': cmd_pids,
|
|
'pat': cmd_pat,
|
|
'pmt': cmd_pmt,
|
|
'dump': cmd_dump,
|
|
'monitor': cmd_monitor,
|
|
}
|
|
|
|
handler = dispatch.get(args.command)
|
|
if handler is None:
|
|
parser.print_help()
|
|
sys.exit(1)
|
|
|
|
handler(args)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|