Add advanced pyserial features

Flow control:
- xonxoff, rtscts, dsrdtr in open_serial_port and configure_serial

Timeouts:
- write_timeout and inter_byte_timeout support

New tools:
- read_until: read until custom terminator (not just newline)
- set_rs485_mode: half-duplex RS-485 for industrial/Modbus
- set_low_latency_mode: reduce kernel buffering latency (Linux)
- set_break_condition: hold/release break state

Also:
- exclusive port locking option in open_serial_port
- updated get_connection_status with all new fields
This commit is contained in:
Ryan Malloy 2026-01-27 22:19:29 -07:00
parent 96d6e3e86b
commit f9f5f3527f

View File

@ -79,6 +79,12 @@ def open_serial_port(
parity: Literal["N", "E", "O", "M", "S"] = "N",
stopbits: Literal[1, 1.5, 2] = 1,
timeout: float = DEFAULT_TIMEOUT,
write_timeout: float | None = None,
inter_byte_timeout: float | None = None,
xonxoff: bool = False,
rtscts: bool = False,
dsrdtr: bool = False,
exclusive: bool = False,
) -> dict:
"""Open a serial port connection.
@ -89,6 +95,12 @@ def open_serial_port(
parity: Parity checking (N=None, E=Even, O=Odd, M=Mark, S=Space)
stopbits: Stop bits (1, 1.5, or 2)
timeout: Read timeout in seconds
write_timeout: Write timeout in seconds (None = blocking)
inter_byte_timeout: Timeout between bytes during read (None = disabled)
xonxoff: Enable software flow control (XON/XOFF)
rtscts: Enable hardware RTS/CTS flow control
dsrdtr: Enable hardware DSR/DTR flow control
exclusive: Request exclusive access (lock port from other processes)
Returns:
Connection status and details
@ -107,6 +119,12 @@ def open_serial_port(
parity=parity,
stopbits=stopbits,
timeout=timeout,
write_timeout=write_timeout,
inter_byte_timeout=inter_byte_timeout,
xonxoff=xonxoff,
rtscts=rtscts,
dsrdtr=dsrdtr,
exclusive=exclusive,
)
_connections[port] = SerialConnection(port=port, connection=conn)
return {
@ -116,6 +134,10 @@ def open_serial_port(
"bytesize": bytesize,
"parity": parity,
"stopbits": stopbits,
"xonxoff": xonxoff,
"rtscts": rtscts,
"dsrdtr": dsrdtr,
"exclusive": exclusive,
"resource_uri": f"serial://{port}/data",
}
except serial.SerialException as e:
@ -270,6 +292,11 @@ def configure_serial(
port: str,
baudrate: int | None = None,
timeout: float | None = None,
write_timeout: float | None = None,
inter_byte_timeout: float | None = None,
xonxoff: bool | None = None,
rtscts: bool | None = None,
dsrdtr: bool | None = None,
rts: bool | None = None,
dtr: bool | None = None,
) -> dict:
@ -279,6 +306,11 @@ def configure_serial(
port: Device path of the port to configure
baudrate: New baud rate (None = no change)
timeout: New read timeout in seconds (None = no change)
write_timeout: New write timeout in seconds (None = no change)
inter_byte_timeout: Timeout between bytes (None = no change)
xonxoff: Software flow control XON/XOFF (None = no change)
rtscts: Hardware RTS/CTS flow control (None = no change)
dsrdtr: Hardware DSR/DTR flow control (None = no change)
rts: Set RTS line state (None = no change)
dtr: Set DTR line state (None = no change)
@ -295,6 +327,16 @@ def configure_serial(
conn.baudrate = baudrate
if timeout is not None:
conn.timeout = timeout
if write_timeout is not None:
conn.write_timeout = write_timeout
if inter_byte_timeout is not None:
conn.inter_byte_timeout = inter_byte_timeout
if xonxoff is not None:
conn.xonxoff = xonxoff
if rtscts is not None:
conn.rtscts = rtscts
if dsrdtr is not None:
conn.dsrdtr = dsrdtr
if rts is not None:
conn.rts = rts
if dtr is not None:
@ -305,6 +347,11 @@ def configure_serial(
"port": port,
"baudrate": conn.baudrate,
"timeout": conn.timeout,
"write_timeout": conn.write_timeout,
"inter_byte_timeout": conn.inter_byte_timeout,
"xonxoff": conn.xonxoff,
"rtscts": conn.rtscts,
"dsrdtr": conn.dsrdtr,
"rts": conn.rts,
"dtr": conn.dtr,
}
@ -330,6 +377,11 @@ def get_connection_status() -> dict:
"parity": conn.parity,
"stopbits": conn.stopbits,
"timeout": conn.timeout,
"write_timeout": conn.write_timeout,
"inter_byte_timeout": conn.inter_byte_timeout,
"xonxoff": conn.xonxoff,
"rtscts": conn.rtscts,
"dsrdtr": conn.dsrdtr,
"in_waiting": conn.in_waiting,
"out_waiting": conn.out_waiting,
"cts": conn.cts,
@ -545,6 +597,159 @@ def send_break(port: str, duration_ms: int = 250) -> dict:
return {"error": str(e), "success": False}
@mcp.tool()
def read_until(
port: str,
terminator: str = "\n",
size: int | None = None,
encoding: str = "utf-8",
) -> dict:
"""Read data until a specific terminator sequence is received.
More flexible than read_serial_line - supports any terminator, not just newline.
Args:
port: Device path of the port to read from
terminator: Byte sequence to stop at (default: newline)
size: Maximum bytes to read (None = no limit, use timeout)
encoding: Character encoding for terminator and result
Returns:
Data read up to and including the terminator
"""
if port not in _connections:
return {"error": f"Port {port} is not open", "success": False}
try:
conn = _connections[port].connection
term_bytes = terminator.encode(encoding)
raw = conn.read_until(term_bytes, size)
return {
"success": True,
"data": raw.decode(encoding, errors="replace"),
"bytes_read": len(raw),
"raw_hex": raw.hex(),
"port": port,
"terminator_found": raw.endswith(term_bytes),
}
except serial.SerialException as e:
return {"error": str(e), "success": False}
@mcp.tool()
def set_rs485_mode(
port: str,
enabled: bool = True,
delay_before_tx: float = 0.0,
delay_before_rx: float = 0.0,
rts_level_for_tx: bool = True,
rts_level_for_rx: bool = False,
loopback: bool = False,
) -> dict:
"""Configure RS-485 mode for half-duplex communication.
RS-485 is commonly used in industrial applications (Modbus, etc.)
where multiple devices share a bus. The driver must control the
TX enable line (typically RTS) to switch between transmit and receive.
Args:
port: Device path of the port
enabled: Enable or disable RS-485 mode
delay_before_tx: Delay in seconds before enabling TX
delay_before_rx: Delay in seconds before enabling RX after TX
rts_level_for_tx: RTS level when transmitting (True=high)
rts_level_for_rx: RTS level when receiving (True=high)
loopback: Enable RS-485 loopback mode
Returns:
RS-485 configuration status
"""
if port not in _connections:
return {"error": f"Port {port} is not open", "success": False}
try:
conn = _connections[port].connection
if enabled:
# Create RS485Settings
rs485_settings = serial.rs485.RS485Settings(
rts_level_for_tx=rts_level_for_tx,
rts_level_for_rx=rts_level_for_rx,
loopback=loopback,
delay_before_tx=delay_before_tx,
delay_before_rx=delay_before_rx,
)
conn.rs485_mode = rs485_settings
else:
conn.rs485_mode = None
return {
"success": True,
"port": port,
"rs485_enabled": enabled,
"rts_level_for_tx": rts_level_for_tx if enabled else None,
"rts_level_for_rx": rts_level_for_rx if enabled else None,
"delay_before_tx": delay_before_tx if enabled else None,
"delay_before_rx": delay_before_rx if enabled else None,
"loopback": loopback if enabled else None,
}
except (serial.SerialException, AttributeError) as e:
return {"error": str(e), "success": False}
@mcp.tool()
def set_low_latency_mode(port: str, enabled: bool = True) -> dict:
"""Enable or disable low-latency mode (Linux only).
Reduces latency by changing how the kernel buffers data.
Useful for time-critical applications.
Note: This is a Linux-specific feature and may not work on all systems.
Args:
port: Device path of the port
enabled: Enable (True) or disable (False) low latency mode
Returns:
Operation status
"""
if port not in _connections:
return {"error": f"Port {port} is not open", "success": False}
try:
conn = _connections[port].connection
conn.set_low_latency_mode(enabled)
return {"success": True, "port": port, "low_latency": enabled}
except (serial.SerialException, AttributeError, OSError) as e:
return {"error": f"Low latency mode not supported: {e}", "success": False}
@mcp.tool()
def set_break_condition(port: str, enabled: bool) -> dict:
"""Set or clear the break condition on a serial port.
Unlike send_break() which sends a timed pulse, this holds the
break condition until explicitly cleared. Useful for protocols
that require sustained break states.
Args:
port: Device path of the port
enabled: True to assert break (hold TX low), False to release
Returns:
Break condition status
"""
if port not in _connections:
return {"error": f"Port {port} is not open", "success": False}
try:
conn = _connections[port].connection
conn.break_condition = enabled
return {"success": True, "port": port, "break_condition": enabled}
except serial.SerialException as e:
return {"error": str(e), "success": False}
# ============================================================================
# RESOURCES - Dynamic data access via URIs
# ============================================================================