Add smart baud rate auto-detection

Uses multiple heuristics for reliable detection:
- 0x55 sync pattern analysis (self-synchronizing byte)
- Bit transition counting (0x55 has 7 transitions vs ~3.5 avg)
- Byte distribution entropy (text clusters vs random)
- ASCII readability scoring with UTF-8 validation
- Line ending and null byte detection

Supports optional probe string (use "UUUUU" for sync-based detection
on echo-enabled devices). Returns top 5 candidates with confidence scores.
This commit is contained in:
Ryan Malloy 2026-01-27 22:26:26 -07:00
parent f9f5f3527f
commit 8014b2833b

View File

@ -750,6 +750,249 @@ def set_break_condition(port: str, enabled: bool) -> dict:
return {"error": str(e), "success": False}
@mcp.tool()
def detect_baud_rate(
port: str,
probe: str | None = None,
timeout_per_rate: float = 0.5,
baudrates: list[int] | None = None,
use_sync_pattern: bool = True,
) -> dict:
"""Auto-detect baud rate using multiple heuristics.
Uses several detection methods:
1. 0x55 sync pattern analysis - The byte 0x55 (01010101 binary, 'U' char)
is self-synchronizing. At wrong baud rates it produces predictable
garbled patterns that reveal the baud rate ratio.
2. Byte distribution analysis - Correct baud shows text-like distribution,
wrong baud shows uniform/random distribution.
3. ASCII readability scoring - Percentage of printable characters.
4. Framing indicators - Line endings, null bytes, valid UTF-8.
Args:
port: Device path (will be opened temporarily if not already open)
probe: Optional string to send to trigger response. Use "U" or "UUUUU"
for sync-based detection on echo-enabled devices.
timeout_per_rate: Seconds to wait for data at each rate (default 0.5)
baudrates: Custom list of rates to try. Default: common rates
use_sync_pattern: Analyze 0x55 sync patterns (default True)
Returns:
Detected baud rates sorted by confidence score
"""
import time
from collections import Counter
# Standard baud rates to try (ordered by commonality)
default_rates = [
115200, 9600, 57600, 38400, 19200, # Most common first
230400, 460800, 921600, # High speed
4800, 2400, 1200, 300, # Legacy
500000, 576000, 1000000, 1500000, # Non-standard high speed
]
rates_to_try = baudrates or default_rates
# Check if port is already open
was_open = port in _connections
original_baudrate = None
if was_open:
original_baudrate = _connections[port].connection.baudrate
results = []
def analyze_sync_pattern(data: bytes) -> dict:
"""Analyze 0x55 sync patterns in the data.
0x55 = 01010101 binary. At correct baud rate, this pattern is preserved.
At wrong rates, it transforms predictably:
- 2x rate: 0x55 might become 0xFF, 0x00, or framing errors
- 0.5x rate: 0x55 becomes stretched patterns
"""
if not data:
return {"sync_score": 0, "sync_bytes": 0, "alternating_bits": 0}
# Count 0x55 bytes (perfect sync)
sync_count = data.count(0x55)
# Count bytes with alternating bit patterns (0x55, 0xAA, etc.)
alternating = sum(1 for b in data if b in (0x55, 0xAA, 0x33, 0xCC, 0x0F, 0xF0))
# Analyze bit transitions - correct baud has consistent timing
transitions = 0
for b in data:
# Count bit transitions within byte
for i in range(7):
if ((b >> i) & 1) != ((b >> (i + 1)) & 1):
transitions += 1
avg_transitions = transitions / len(data) if data else 0
# 0x55 has 7 transitions, random data has ~3.5 average
transition_score = min(100, (avg_transitions / 7) * 100) if avg_transitions > 3 else 0
sync_score = (sync_count / len(data)) * 100 if data else 0
alt_score = (alternating / len(data)) * 100 if data else 0
return {
"sync_score": round(sync_score, 1),
"sync_bytes": sync_count,
"alternating_bits": round(alt_score, 1),
"transition_score": round(transition_score, 1),
}
def analyze_distribution(data: bytes) -> dict:
"""Analyze byte distribution for text-like patterns.
Text data clusters around ASCII letters/digits (0x20-0x7E).
Wrong baud rate produces more uniform distribution.
"""
if not data or len(data) < 10:
return {"entropy": 0, "text_cluster": 0}
counter = Counter(data)
total = len(data)
# Calculate entropy (0 = uniform, higher = more random/wrong baud)
import math
entropy = 0
for count in counter.values():
if count > 0:
p = count / total
entropy -= p * math.log2(p)
# Max entropy for 256 symbols is 8 bits
normalized_entropy = entropy / 8 * 100
# Check clustering around printable ASCII (0x20-0x7E)
printable_range = sum(counter.get(b, 0) for b in range(0x20, 0x7F))
text_cluster = (printable_range / total) * 100
return {
"entropy": round(normalized_entropy, 1),
"text_cluster": round(text_cluster, 1),
}
def score_data(data: bytes, use_sync: bool) -> dict:
"""Comprehensive scoring of received data."""
if not data:
return {"score": 0, "printable_pct": 0, "sample": "", "bytes_received": 0}
# Basic readability
printable = sum(1 for b in data if 32 <= b <= 126 or b in (9, 10, 13))
printable_pct = (printable / len(data)) * 100
has_newlines = b'\n' in data or b'\r' in data
null_pct = (data.count(0x00) / len(data)) * 100
# Start with printable percentage as base score
score = printable_pct
# Sync pattern analysis
sync_info = {}
if use_sync:
sync_info = analyze_sync_pattern(data)
# Boost score if we see sync patterns
score += sync_info.get("sync_score", 0) * 0.5
score += sync_info.get("transition_score", 0) * 0.3
# Distribution analysis
dist_info = analyze_distribution(data)
# High text clustering boosts score
score += dist_info.get("text_cluster", 0) * 0.3
# High entropy (randomness) penalizes score - might be wrong baud
if dist_info.get("entropy", 0) > 70:
score -= 20
# Bonuses and penalties
if has_newlines:
score += 15 # Strong indicator of text protocol
if null_pct > 30:
score -= 40 # Lots of nulls = likely wrong baud
# UTF-8 validity bonus
try:
decoded = data.decode('utf-8')
score += 10
sample = decoded[:60].replace('\n', '\\n').replace('\r', '\\r')
except UnicodeDecodeError:
sample = data[:30].hex()
return {
"score": round(max(0, min(100, score)), 1),
"printable_pct": round(printable_pct, 1),
"has_newlines": has_newlines,
"sample": sample,
"bytes_received": len(data),
**sync_info,
**dist_info,
}
try:
for rate in rates_to_try:
try:
if was_open:
conn = _connections[port].connection
conn.baudrate = rate
conn.reset_input_buffer()
else:
conn = serial.Serial(
port=port,
baudrate=rate,
timeout=timeout_per_rate,
)
# Send probe if provided
if probe:
conn.write(probe.encode())
conn.flush()
time.sleep(0.05)
# Wait and collect data
time.sleep(timeout_per_rate)
available = conn.in_waiting
data = conn.read(available) if available else b""
# Score this rate
score_result = score_data(data, use_sync_pattern)
if score_result["bytes_received"] > 0:
results.append({
"baudrate": rate,
**score_result,
})
if not was_open:
conn.close()
except serial.SerialException:
continue
# Restore original baud rate
if was_open and original_baudrate:
_connections[port].connection.baudrate = original_baudrate
_connections[port].connection.reset_input_buffer()
# Sort by score
results.sort(key=lambda x: x["score"], reverse=True)
best = results[0] if results else None
return {
"success": True,
"port": port,
"detected_baudrate": best["baudrate"] if best and best["score"] > 50 else None,
"confidence": best["score"] if best else 0,
"results": results[:5],
"rates_tested": len(rates_to_try),
"algorithm": "sync_pattern + distribution + readability analysis",
}
except serial.SerialException as e:
if was_open and original_baudrate:
import contextlib
with contextlib.suppress(Exception):
_connections[port].connection.baudrate = original_baudrate
return {"error": str(e), "success": False}
# ============================================================================
# RESOURCES - Dynamic data access via URIs
# ============================================================================