From 974f2ee6dc16ca42ca1428bc26791bc1157862bb Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Tue, 27 Jan 2026 22:42:40 -0700 Subject: [PATCH] Add RS-485 specific tools New tools for RS-485 half-duplex communication: - check_rs485_support: Detect hardware RS-485 capability via udev/ioctl Reports driver type (FTDI/CP210x/CH340), kernel support, and recommendations - rs485_transact: Send request and receive response with automatic TX/RX turnaround timing. Handles manual RTS control when needed. - rs485_scan_addresses: Scan bus for responding devices (1-247) Useful for Modbus device discovery --- src/mcserial/server.py | 291 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 291 insertions(+) diff --git a/src/mcserial/server.py b/src/mcserial/server.py index 057853b..eae1a91 100644 --- a/src/mcserial/server.py +++ b/src/mcserial/server.py @@ -853,6 +853,297 @@ def set_rs485_mode( return {"error": str(e), "success": False} +@mcp.tool() +def check_rs485_support(port: str) -> dict: + """Check RS-485 hardware support for a serial port. + + Detects whether the port/driver supports hardware RS-485 mode + (automatic DE/RE control) or requires software emulation. + + Args: + port: Device path to check (doesn't need to be open) + + Returns: + RS-485 capability information + """ + import subprocess + + result = { + "port": port, + "driver": None, + "chip": None, + "hardware_rs485": False, + "software_fallback": True, + "notes": [], + } + + # Try to get USB device info + try: + # Get the device's parent USB info via udevadm + udev_output = subprocess.run( + ["udevadm", "info", "-a", "-n", port], + capture_output=True, + text=True, + timeout=5, + ) + udev_info = udev_output.stdout.lower() + + # Detect driver + if "ftdi" in udev_info: + result["driver"] = "ftdi_sio" + result["hardware_rs485"] = True + result["notes"].append("FTDI chips have hardware RS-485 auto-direction") + elif "cp210x" in udev_info or "silabs" in udev_info: + result["driver"] = "cp210x" + # Only some CP210x models support RS-485 + if "cp2102" in udev_info: + result["chip"] = "CP2102" + result["hardware_rs485"] = False + result["notes"].append("CP2102 lacks hardware RS-485; use software RTS control") + elif "cp2105" in udev_info or "cp2108" in udev_info: + result["chip"] = "CP2105/CP2108" + result["hardware_rs485"] = True + result["notes"].append("CP2105/CP2108 have hardware RS-485 support") + else: + result["notes"].append("CP210x detected; RS-485 support varies by model") + elif "ch341" in udev_info or "ch340" in udev_info: + result["driver"] = "ch341" + result["hardware_rs485"] = False + result["notes"].append("CH340/CH341 lacks hardware RS-485; timing may be unreliable") + elif "pl2303" in udev_info: + result["driver"] = "pl2303" + result["hardware_rs485"] = False + result["notes"].append("PL2303 lacks hardware RS-485; use software RTS control") + elif "ttyS" in port or "ttyAMA" in port: + result["driver"] = "native" + result["hardware_rs485"] = True + result["notes"].append("Native UART; check if RS-485 transceiver is connected") + + # Try to detect chip from product string + for line in udev_output.stdout.split('\n'): + if 'ATTRS{product}' in line: + result["chip"] = line.split('==')[1].strip(' "') if '==' in line else None + break + + except (subprocess.TimeoutExpired, FileNotFoundError): + result["notes"].append("Could not query udev; install udevadm for better detection") + + # Test if kernel RS-485 ioctl works + try: + import fcntl + + # TIOCSRS485 = 0x542F, TIOCGRS485 = 0x542E + TIOCGRS485 = 0x542E + + with open(port) as f: + try: + # Try to get current RS-485 settings + fcntl.ioctl(f.fileno(), TIOCGRS485, b'\x00' * 32) + result["kernel_rs485_ioctl"] = True + result["notes"].append("Kernel TIOCSRS485 ioctl supported") + except OSError: + result["kernel_rs485_ioctl"] = False + result["notes"].append("Kernel RS-485 ioctl not supported on this port") + except Exception as e: + result["kernel_rs485_ioctl"] = False + result["notes"].append(f"Could not test ioctl: {e}") + + # Recommendation + if result["hardware_rs485"]: + result["recommendation"] = "Use set_rs485_mode() for automatic DE/RE control" + else: + result["recommendation"] = "Use manual RTS control via set_modem_lines() around writes" + + result["success"] = True + return result + + +@mcp.tool() +def rs485_transact( + port: str, + data: str, + response_timeout: float = 1.0, + response_terminator: str | None = None, + response_length: int | None = None, + encoding: str = "utf-8", + turnaround_delay: float = 0.005, +) -> dict: + """Send data and receive response on RS-485 bus (half-duplex transaction). + + Handles the TX→RX turnaround timing automatically. For devices without + hardware RS-485 support, manually controls RTS around the transaction. + + Useful for request/response protocols like Modbus RTU. + + Args: + port: Device path (must be open, preferably with RS-485 mode set) + data: Data to send + response_timeout: Max time to wait for response (seconds) + response_terminator: Stop reading when this sequence is received + response_length: Expected response length in bytes (alternative to terminator) + encoding: Character encoding for data + turnaround_delay: Delay after TX before reading (seconds) + + Returns: + Transaction result with sent/received data + """ + import time + + if port not in _connections: + return {"error": f"Port {port} is not open", "success": False} + + try: + conn = _connections[port].connection + has_hw_rs485 = conn.rs485_mode is not None + + # Clear any pending data + conn.reset_input_buffer() + + # If no hardware RS-485, manually control RTS for TX + if not has_hw_rs485: + conn.rts = True # Enable driver (TX mode) + time.sleep(0.001) # Small settling time + + # Send data + encoded = data.encode(encoding) + bytes_written = conn.write(encoded) + conn.flush() + + # Wait for transmission to complete + # At slowest common rate (9600), 1 byte = ~1ms + tx_time = (bytes_written * 10) / conn.baudrate # 10 bits per byte typical + time.sleep(tx_time + turnaround_delay) + + # Switch to RX mode if manual control + if not has_hw_rs485: + conn.rts = False # Disable driver (RX mode) + time.sleep(0.001) + + # Read response + original_timeout = conn.timeout + conn.timeout = response_timeout + + try: + if response_terminator: + term_bytes = response_terminator.encode(encoding) + raw_response = conn.read_until(term_bytes) + elif response_length: + raw_response = conn.read(response_length) + else: + # Wait for data, then read all available + time.sleep(response_timeout * 0.5) + available = conn.in_waiting + raw_response = conn.read(available) if available else b"" + finally: + conn.timeout = original_timeout + + return { + "success": True, + "port": port, + "bytes_sent": bytes_written, + "data_sent": data, + "response": raw_response.decode(encoding, errors="replace"), + "response_bytes": len(raw_response), + "response_hex": raw_response.hex(), + "hardware_rs485": has_hw_rs485, + } + + except serial.SerialException as e: + return {"error": str(e), "success": False} + + +@mcp.tool() +def rs485_scan_addresses( + port: str, + start_address: int = 1, + end_address: int = 247, + probe_template: str = "{addr:02x}03000001", + response_timeout: float = 0.1, + encoding: str = "latin-1", +) -> dict: + """Scan RS-485 bus for responding devices (address discovery). + + Sends a probe message to each address and records which ones respond. + Useful for discovering Modbus or similar addressed devices on the bus. + + Args: + port: Device path (must be open with RS-485 mode) + start_address: First address to scan (default 1) + end_address: Last address to scan (default 247, Modbus max) + probe_template: Message template with {addr} placeholder. Default is + a Modbus "read holding register" frame (customize for your protocol) + response_timeout: Time to wait for response per address (seconds) + encoding: Encoding for probe (use 'latin-1' for raw bytes) + + Returns: + List of responding addresses + """ + import time + + if port not in _connections: + return {"error": f"Port {port} is not open", "success": False} + + try: + conn = _connections[port].connection + has_hw_rs485 = conn.rs485_mode is not None + + responding = [] + scanned = 0 + + for addr in range(start_address, end_address + 1): + # Format probe with address + try: + if "{addr" in probe_template: + probe = probe_template.format(addr=addr) + else: + probe = chr(addr) + probe_template + except (ValueError, KeyError): + probe = chr(addr) + probe_template + + # Clear buffers + conn.reset_input_buffer() + + # TX + if not has_hw_rs485: + conn.rts = True + time.sleep(0.001) + + conn.write(probe.encode(encoding)) + conn.flush() + + tx_time = (len(probe) * 10) / conn.baudrate + time.sleep(tx_time + 0.002) + + if not has_hw_rs485: + conn.rts = False + time.sleep(0.001) + + # Wait for response + time.sleep(response_timeout) + + if conn.in_waiting > 0: + response = conn.read(conn.in_waiting) + responding.append({ + "address": addr, + "response_length": len(response), + "response_hex": response.hex()[:32], # First 16 bytes + }) + + scanned += 1 + + return { + "success": True, + "port": port, + "addresses_scanned": scanned, + "devices_found": len(responding), + "responding_addresses": responding, + "hardware_rs485": has_hw_rs485, + } + + except serial.SerialException as e: + return {"error": str(e), "success": False} + + @mcp.tool() def set_low_latency_mode(port: str, enabled: bool = True) -> dict: """Enable or disable low-latency mode (Linux only).