diff --git a/src/mcserial/server.py b/src/mcserial/server.py index 2be675c..5438b6f 100644 --- a/src/mcserial/server.py +++ b/src/mcserial/server.py @@ -750,6 +750,249 @@ def set_break_condition(port: str, enabled: bool) -> dict: return {"error": str(e), "success": False} +@mcp.tool() +def detect_baud_rate( + port: str, + probe: str | None = None, + timeout_per_rate: float = 0.5, + baudrates: list[int] | None = None, + use_sync_pattern: bool = True, +) -> dict: + """Auto-detect baud rate using multiple heuristics. + + Uses several detection methods: + 1. 0x55 sync pattern analysis - The byte 0x55 (01010101 binary, 'U' char) + is self-synchronizing. At wrong baud rates it produces predictable + garbled patterns that reveal the baud rate ratio. + 2. Byte distribution analysis - Correct baud shows text-like distribution, + wrong baud shows uniform/random distribution. + 3. ASCII readability scoring - Percentage of printable characters. + 4. Framing indicators - Line endings, null bytes, valid UTF-8. + + Args: + port: Device path (will be opened temporarily if not already open) + probe: Optional string to send to trigger response. Use "U" or "UUUUU" + for sync-based detection on echo-enabled devices. + timeout_per_rate: Seconds to wait for data at each rate (default 0.5) + baudrates: Custom list of rates to try. Default: common rates + use_sync_pattern: Analyze 0x55 sync patterns (default True) + + Returns: + Detected baud rates sorted by confidence score + """ + import time + from collections import Counter + + # Standard baud rates to try (ordered by commonality) + default_rates = [ + 115200, 9600, 57600, 38400, 19200, # Most common first + 230400, 460800, 921600, # High speed + 4800, 2400, 1200, 300, # Legacy + 500000, 576000, 1000000, 1500000, # Non-standard high speed + ] + rates_to_try = baudrates or default_rates + + # Check if port is already open + was_open = port in _connections + original_baudrate = None + + if was_open: + original_baudrate = _connections[port].connection.baudrate + + results = [] + + def analyze_sync_pattern(data: bytes) -> dict: + """Analyze 0x55 sync patterns in the data. + + 0x55 = 01010101 binary. At correct baud rate, this pattern is preserved. + At wrong rates, it transforms predictably: + - 2x rate: 0x55 might become 0xFF, 0x00, or framing errors + - 0.5x rate: 0x55 becomes stretched patterns + """ + if not data: + return {"sync_score": 0, "sync_bytes": 0, "alternating_bits": 0} + + # Count 0x55 bytes (perfect sync) + sync_count = data.count(0x55) + + # Count bytes with alternating bit patterns (0x55, 0xAA, etc.) + alternating = sum(1 for b in data if b in (0x55, 0xAA, 0x33, 0xCC, 0x0F, 0xF0)) + + # Analyze bit transitions - correct baud has consistent timing + transitions = 0 + for b in data: + # Count bit transitions within byte + for i in range(7): + if ((b >> i) & 1) != ((b >> (i + 1)) & 1): + transitions += 1 + + avg_transitions = transitions / len(data) if data else 0 + # 0x55 has 7 transitions, random data has ~3.5 average + transition_score = min(100, (avg_transitions / 7) * 100) if avg_transitions > 3 else 0 + + sync_score = (sync_count / len(data)) * 100 if data else 0 + alt_score = (alternating / len(data)) * 100 if data else 0 + + return { + "sync_score": round(sync_score, 1), + "sync_bytes": sync_count, + "alternating_bits": round(alt_score, 1), + "transition_score": round(transition_score, 1), + } + + def analyze_distribution(data: bytes) -> dict: + """Analyze byte distribution for text-like patterns. + + Text data clusters around ASCII letters/digits (0x20-0x7E). + Wrong baud rate produces more uniform distribution. + """ + if not data or len(data) < 10: + return {"entropy": 0, "text_cluster": 0} + + counter = Counter(data) + total = len(data) + + # Calculate entropy (0 = uniform, higher = more random/wrong baud) + import math + entropy = 0 + for count in counter.values(): + if count > 0: + p = count / total + entropy -= p * math.log2(p) + + # Max entropy for 256 symbols is 8 bits + normalized_entropy = entropy / 8 * 100 + + # Check clustering around printable ASCII (0x20-0x7E) + printable_range = sum(counter.get(b, 0) for b in range(0x20, 0x7F)) + text_cluster = (printable_range / total) * 100 + + return { + "entropy": round(normalized_entropy, 1), + "text_cluster": round(text_cluster, 1), + } + + def score_data(data: bytes, use_sync: bool) -> dict: + """Comprehensive scoring of received data.""" + if not data: + return {"score": 0, "printable_pct": 0, "sample": "", "bytes_received": 0} + + # Basic readability + printable = sum(1 for b in data if 32 <= b <= 126 or b in (9, 10, 13)) + printable_pct = (printable / len(data)) * 100 + + has_newlines = b'\n' in data or b'\r' in data + null_pct = (data.count(0x00) / len(data)) * 100 + + # Start with printable percentage as base score + score = printable_pct + + # Sync pattern analysis + sync_info = {} + if use_sync: + sync_info = analyze_sync_pattern(data) + # Boost score if we see sync patterns + score += sync_info.get("sync_score", 0) * 0.5 + score += sync_info.get("transition_score", 0) * 0.3 + + # Distribution analysis + dist_info = analyze_distribution(data) + # High text clustering boosts score + score += dist_info.get("text_cluster", 0) * 0.3 + # High entropy (randomness) penalizes score - might be wrong baud + if dist_info.get("entropy", 0) > 70: + score -= 20 + + # Bonuses and penalties + if has_newlines: + score += 15 # Strong indicator of text protocol + if null_pct > 30: + score -= 40 # Lots of nulls = likely wrong baud + + # UTF-8 validity bonus + try: + decoded = data.decode('utf-8') + score += 10 + sample = decoded[:60].replace('\n', '\\n').replace('\r', '\\r') + except UnicodeDecodeError: + sample = data[:30].hex() + + return { + "score": round(max(0, min(100, score)), 1), + "printable_pct": round(printable_pct, 1), + "has_newlines": has_newlines, + "sample": sample, + "bytes_received": len(data), + **sync_info, + **dist_info, + } + + try: + for rate in rates_to_try: + try: + if was_open: + conn = _connections[port].connection + conn.baudrate = rate + conn.reset_input_buffer() + else: + conn = serial.Serial( + port=port, + baudrate=rate, + timeout=timeout_per_rate, + ) + + # Send probe if provided + if probe: + conn.write(probe.encode()) + conn.flush() + time.sleep(0.05) + + # Wait and collect data + time.sleep(timeout_per_rate) + available = conn.in_waiting + data = conn.read(available) if available else b"" + + # Score this rate + score_result = score_data(data, use_sync_pattern) + if score_result["bytes_received"] > 0: + results.append({ + "baudrate": rate, + **score_result, + }) + + if not was_open: + conn.close() + + except serial.SerialException: + continue + + # Restore original baud rate + if was_open and original_baudrate: + _connections[port].connection.baudrate = original_baudrate + _connections[port].connection.reset_input_buffer() + + # Sort by score + results.sort(key=lambda x: x["score"], reverse=True) + best = results[0] if results else None + + return { + "success": True, + "port": port, + "detected_baudrate": best["baudrate"] if best and best["score"] > 50 else None, + "confidence": best["score"] if best else 0, + "results": results[:5], + "rates_tested": len(rates_to_try), + "algorithm": "sync_pattern + distribution + readability analysis", + } + + except serial.SerialException as e: + if was_open and original_baudrate: + import contextlib + with contextlib.suppress(Exception): + _connections[port].connection.baudrate = original_baudrate + return {"error": str(e), "success": False} + + # ============================================================================ # RESOURCES - Dynamic data access via URIs # ============================================================================