Add RS-485 specific tools

New tools for RS-485 half-duplex communication:

- check_rs485_support: Detect hardware RS-485 capability via udev/ioctl
  Reports driver type (FTDI/CP210x/CH340), kernel support, and recommendations

- rs485_transact: Send request and receive response with automatic
  TX/RX turnaround timing. Handles manual RTS control when needed.

- rs485_scan_addresses: Scan bus for responding devices (1-247)
  Useful for Modbus device discovery
This commit is contained in:
Ryan Malloy 2026-01-27 22:42:40 -07:00
parent 948775968d
commit 974f2ee6dc

View File

@ -853,6 +853,297 @@ def set_rs485_mode(
return {"error": str(e), "success": False} return {"error": str(e), "success": False}
@mcp.tool()
def check_rs485_support(port: str) -> dict:
"""Check RS-485 hardware support for a serial port.
Detects whether the port/driver supports hardware RS-485 mode
(automatic DE/RE control) or requires software emulation.
Args:
port: Device path to check (doesn't need to be open)
Returns:
RS-485 capability information
"""
import subprocess
result = {
"port": port,
"driver": None,
"chip": None,
"hardware_rs485": False,
"software_fallback": True,
"notes": [],
}
# Try to get USB device info
try:
# Get the device's parent USB info via udevadm
udev_output = subprocess.run(
["udevadm", "info", "-a", "-n", port],
capture_output=True,
text=True,
timeout=5,
)
udev_info = udev_output.stdout.lower()
# Detect driver
if "ftdi" in udev_info:
result["driver"] = "ftdi_sio"
result["hardware_rs485"] = True
result["notes"].append("FTDI chips have hardware RS-485 auto-direction")
elif "cp210x" in udev_info or "silabs" in udev_info:
result["driver"] = "cp210x"
# Only some CP210x models support RS-485
if "cp2102" in udev_info:
result["chip"] = "CP2102"
result["hardware_rs485"] = False
result["notes"].append("CP2102 lacks hardware RS-485; use software RTS control")
elif "cp2105" in udev_info or "cp2108" in udev_info:
result["chip"] = "CP2105/CP2108"
result["hardware_rs485"] = True
result["notes"].append("CP2105/CP2108 have hardware RS-485 support")
else:
result["notes"].append("CP210x detected; RS-485 support varies by model")
elif "ch341" in udev_info or "ch340" in udev_info:
result["driver"] = "ch341"
result["hardware_rs485"] = False
result["notes"].append("CH340/CH341 lacks hardware RS-485; timing may be unreliable")
elif "pl2303" in udev_info:
result["driver"] = "pl2303"
result["hardware_rs485"] = False
result["notes"].append("PL2303 lacks hardware RS-485; use software RTS control")
elif "ttyS" in port or "ttyAMA" in port:
result["driver"] = "native"
result["hardware_rs485"] = True
result["notes"].append("Native UART; check if RS-485 transceiver is connected")
# Try to detect chip from product string
for line in udev_output.stdout.split('\n'):
if 'ATTRS{product}' in line:
result["chip"] = line.split('==')[1].strip(' "') if '==' in line else None
break
except (subprocess.TimeoutExpired, FileNotFoundError):
result["notes"].append("Could not query udev; install udevadm for better detection")
# Test if kernel RS-485 ioctl works
try:
import fcntl
# TIOCSRS485 = 0x542F, TIOCGRS485 = 0x542E
TIOCGRS485 = 0x542E
with open(port) as f:
try:
# Try to get current RS-485 settings
fcntl.ioctl(f.fileno(), TIOCGRS485, b'\x00' * 32)
result["kernel_rs485_ioctl"] = True
result["notes"].append("Kernel TIOCSRS485 ioctl supported")
except OSError:
result["kernel_rs485_ioctl"] = False
result["notes"].append("Kernel RS-485 ioctl not supported on this port")
except Exception as e:
result["kernel_rs485_ioctl"] = False
result["notes"].append(f"Could not test ioctl: {e}")
# Recommendation
if result["hardware_rs485"]:
result["recommendation"] = "Use set_rs485_mode() for automatic DE/RE control"
else:
result["recommendation"] = "Use manual RTS control via set_modem_lines() around writes"
result["success"] = True
return result
@mcp.tool()
def rs485_transact(
port: str,
data: str,
response_timeout: float = 1.0,
response_terminator: str | None = None,
response_length: int | None = None,
encoding: str = "utf-8",
turnaround_delay: float = 0.005,
) -> dict:
"""Send data and receive response on RS-485 bus (half-duplex transaction).
Handles the TXRX turnaround timing automatically. For devices without
hardware RS-485 support, manually controls RTS around the transaction.
Useful for request/response protocols like Modbus RTU.
Args:
port: Device path (must be open, preferably with RS-485 mode set)
data: Data to send
response_timeout: Max time to wait for response (seconds)
response_terminator: Stop reading when this sequence is received
response_length: Expected response length in bytes (alternative to terminator)
encoding: Character encoding for data
turnaround_delay: Delay after TX before reading (seconds)
Returns:
Transaction result with sent/received data
"""
import time
if port not in _connections:
return {"error": f"Port {port} is not open", "success": False}
try:
conn = _connections[port].connection
has_hw_rs485 = conn.rs485_mode is not None
# Clear any pending data
conn.reset_input_buffer()
# If no hardware RS-485, manually control RTS for TX
if not has_hw_rs485:
conn.rts = True # Enable driver (TX mode)
time.sleep(0.001) # Small settling time
# Send data
encoded = data.encode(encoding)
bytes_written = conn.write(encoded)
conn.flush()
# Wait for transmission to complete
# At slowest common rate (9600), 1 byte = ~1ms
tx_time = (bytes_written * 10) / conn.baudrate # 10 bits per byte typical
time.sleep(tx_time + turnaround_delay)
# Switch to RX mode if manual control
if not has_hw_rs485:
conn.rts = False # Disable driver (RX mode)
time.sleep(0.001)
# Read response
original_timeout = conn.timeout
conn.timeout = response_timeout
try:
if response_terminator:
term_bytes = response_terminator.encode(encoding)
raw_response = conn.read_until(term_bytes)
elif response_length:
raw_response = conn.read(response_length)
else:
# Wait for data, then read all available
time.sleep(response_timeout * 0.5)
available = conn.in_waiting
raw_response = conn.read(available) if available else b""
finally:
conn.timeout = original_timeout
return {
"success": True,
"port": port,
"bytes_sent": bytes_written,
"data_sent": data,
"response": raw_response.decode(encoding, errors="replace"),
"response_bytes": len(raw_response),
"response_hex": raw_response.hex(),
"hardware_rs485": has_hw_rs485,
}
except serial.SerialException as e:
return {"error": str(e), "success": False}
@mcp.tool()
def rs485_scan_addresses(
port: str,
start_address: int = 1,
end_address: int = 247,
probe_template: str = "{addr:02x}03000001",
response_timeout: float = 0.1,
encoding: str = "latin-1",
) -> dict:
"""Scan RS-485 bus for responding devices (address discovery).
Sends a probe message to each address and records which ones respond.
Useful for discovering Modbus or similar addressed devices on the bus.
Args:
port: Device path (must be open with RS-485 mode)
start_address: First address to scan (default 1)
end_address: Last address to scan (default 247, Modbus max)
probe_template: Message template with {addr} placeholder. Default is
a Modbus "read holding register" frame (customize for your protocol)
response_timeout: Time to wait for response per address (seconds)
encoding: Encoding for probe (use 'latin-1' for raw bytes)
Returns:
List of responding addresses
"""
import time
if port not in _connections:
return {"error": f"Port {port} is not open", "success": False}
try:
conn = _connections[port].connection
has_hw_rs485 = conn.rs485_mode is not None
responding = []
scanned = 0
for addr in range(start_address, end_address + 1):
# Format probe with address
try:
if "{addr" in probe_template:
probe = probe_template.format(addr=addr)
else:
probe = chr(addr) + probe_template
except (ValueError, KeyError):
probe = chr(addr) + probe_template
# Clear buffers
conn.reset_input_buffer()
# TX
if not has_hw_rs485:
conn.rts = True
time.sleep(0.001)
conn.write(probe.encode(encoding))
conn.flush()
tx_time = (len(probe) * 10) / conn.baudrate
time.sleep(tx_time + 0.002)
if not has_hw_rs485:
conn.rts = False
time.sleep(0.001)
# Wait for response
time.sleep(response_timeout)
if conn.in_waiting > 0:
response = conn.read(conn.in_waiting)
responding.append({
"address": addr,
"response_length": len(response),
"response_hex": response.hex()[:32], # First 16 bytes
})
scanned += 1
return {
"success": True,
"port": port,
"addresses_scanned": scanned,
"devices_found": len(responding),
"responding_addresses": responding,
"hardware_rs485": has_hw_rs485,
}
except serial.SerialException as e:
return {"error": str(e), "success": False}
@mcp.tool() @mcp.tool()
def set_low_latency_mode(port: str, enabled: bool = True) -> dict: def set_low_latency_mode(port: str, enabled: bool = True) -> dict:
"""Enable or disable low-latency mode (Linux only). """Enable or disable low-latency mode (Linux only).