Add RS-232 modem control line tools

New tools for hardware control:
- get_modem_lines: read all input (CTS/DSR/RI/CD) and output (RTS/DTR) lines
- set_modem_lines: set RTS/DTR states
- pulse_line: pulse RTS or DTR for reset sequences (ESP32, Arduino, etc.)
- send_break: send serial break signal

Also added RI and CD to get_connection_status and port status resource.
This commit is contained in:
Ryan Malloy 2026-01-27 22:15:03 -07:00
parent 1a26109fc1
commit 96d6e3e86b

View File

@ -334,6 +334,8 @@ def get_connection_status() -> dict:
"out_waiting": conn.out_waiting, "out_waiting": conn.out_waiting,
"cts": conn.cts, "cts": conn.cts,
"dsr": conn.dsr, "dsr": conn.dsr,
"ri": conn.ri,
"cd": conn.cd,
"rts": conn.rts, "rts": conn.rts,
"dtr": conn.dtr, "dtr": conn.dtr,
"resource_uri": f"serial://{port}/data", "resource_uri": f"serial://{port}/data",
@ -369,6 +371,180 @@ def flush_serial(port: str, input_buffer: bool = True, output_buffer: bool = Tru
return {"error": str(e), "success": False} return {"error": str(e), "success": False}
@mcp.tool()
def get_modem_lines(port: str) -> dict:
"""Get all RS-232 modem control/status line states.
Input lines (directly readable from device):
- CTS: Clear To Send (device ready to receive)
- DSR: Data Set Ready (device is present/powered)
- RI: Ring Indicator (incoming call on modem)
- CD: Carrier Detect (connection established)
Output lines (set by us):
- RTS: Request To Send (we're ready to receive)
- DTR: Data Terminal Ready (we're present/active)
Args:
port: Device path of the port to query
Returns:
All modem line states
"""
if port not in _connections:
return {"error": f"Port {port} is not open", "success": False}
try:
conn = _connections[port].connection
return {
"success": True,
"port": port,
"input_lines": {
"cts": conn.cts,
"dsr": conn.dsr,
"ri": conn.ri,
"cd": conn.cd,
},
"output_lines": {
"rts": conn.rts,
"dtr": conn.dtr,
},
}
except serial.SerialException as e:
return {"error": str(e), "success": False}
@mcp.tool()
def set_modem_lines(
port: str,
rts: bool | None = None,
dtr: bool | None = None,
) -> dict:
"""Set RS-232 output control lines (RTS and DTR).
These lines can be used for:
- Hardware flow control
- Device reset sequences (many boards use DTR for reset)
- Power control on some devices
- Custom signaling protocols
Args:
port: Device path of the port
rts: Set RTS (Request To Send) state, None = no change
dtr: Set DTR (Data Terminal Ready) state, None = no change
Returns:
Updated line states
"""
if port not in _connections:
return {"error": f"Port {port} is not open", "success": False}
try:
conn = _connections[port].connection
if rts is not None:
conn.rts = rts
if dtr is not None:
conn.dtr = dtr
return {
"success": True,
"port": port,
"rts": conn.rts,
"dtr": conn.dtr,
}
except serial.SerialException as e:
return {"error": str(e), "success": False}
@mcp.tool()
def pulse_line(
port: str,
line: Literal["rts", "dtr"],
duration_ms: int = 100,
active_low: bool = True,
) -> dict:
"""Pulse an RS-232 control line (useful for reset sequences).
Many devices use DTR or RTS for reset:
- ESP32/ESP8266: DTR low + RTS sequence for bootloader
- Arduino: DTR pulse for reset
- Custom hardware: Various reset/trigger signals
Args:
port: Device path of the port
line: Which line to pulse ("rts" or "dtr")
duration_ms: Pulse duration in milliseconds (default 100)
active_low: If True, pulse LOW then HIGH. If False, pulse HIGH then LOW.
Returns:
Pulse operation status
"""
import time
if port not in _connections:
return {"error": f"Port {port} is not open", "success": False}
if duration_ms < 1 or duration_ms > 5000:
return {"error": "duration_ms must be between 1 and 5000", "success": False}
try:
conn = _connections[port].connection
# Get current state
if line == "rts":
original = conn.rts
conn.rts = not active_low # Start state
time.sleep(duration_ms / 1000.0)
conn.rts = active_low # Pulse state
time.sleep(duration_ms / 1000.0)
conn.rts = original # Restore
else: # dtr
original = conn.dtr
conn.dtr = not active_low
time.sleep(duration_ms / 1000.0)
conn.dtr = active_low
time.sleep(duration_ms / 1000.0)
conn.dtr = original
return {
"success": True,
"port": port,
"line": line,
"duration_ms": duration_ms,
"active_low": active_low,
}
except serial.SerialException as e:
return {"error": str(e), "success": False}
@mcp.tool()
def send_break(port: str, duration_ms: int = 250) -> dict:
"""Send a serial break signal.
A break is a sustained low signal longer than a character frame,
used to get attention of remote device or trigger special modes.
Args:
port: Device path of the port
duration_ms: Break duration in milliseconds (default 250)
Returns:
Break operation status
"""
if port not in _connections:
return {"error": f"Port {port} is not open", "success": False}
if duration_ms < 1 or duration_ms > 5000:
return {"error": "duration_ms must be between 1 and 5000", "success": False}
try:
conn = _connections[port].connection
conn.send_break(duration=duration_ms / 1000.0)
return {"success": True, "port": port, "duration_ms": duration_ms}
except serial.SerialException as e:
return {"error": str(e), "success": False}
# ============================================================================ # ============================================================================
# RESOURCES - Dynamic data access via URIs # RESOURCES - Dynamic data access via URIs
# ============================================================================ # ============================================================================
@ -435,6 +611,8 @@ def resource_port_status(port: str) -> str:
f"- Bytes waiting (out): {conn.out_waiting}", f"- Bytes waiting (out): {conn.out_waiting}",
f"- CTS: {conn.cts}", f"- CTS: {conn.cts}",
f"- DSR: {conn.dsr}", f"- DSR: {conn.dsr}",
f"- RI: {conn.ri}",
f"- CD: {conn.cd}",
f"- RTS: {conn.rts}", f"- RTS: {conn.rts}",
f"- DTR: {conn.dtr}", f"- DTR: {conn.dtr}",
] ]