Add URL handlers, pyserial feature parity, and cp2110 extra

- open_serial_port now accepts URL schemes (socket://, rfc2217://,
  loop://, spy://, cp2110://) via serial_for_url()
- cp2110 HID-to-UART support gated behind optional [cp2110] extra
- Add grep parameter to list_serial_ports for regex filtering
- Add read_serial_lines for batch multi-line reads
- Add cancel_read/cancel_write for pending I/O interruption
- Add set_flow_control for manual XON/XOFF gating
- Add break_condition to get_modem_lines response
This commit is contained in:
Ryan Malloy 2026-02-01 22:39:09 -07:00
parent b5bd2bb470
commit 5c655fe743
2 changed files with 244 additions and 23 deletions

View File

@ -24,6 +24,9 @@ dependencies = [
]
[project.optional-dependencies]
cp2110 = [
"hidapi>=0.14.0",
]
dev = [
"ruff>=0.9.0",
"pytest>=8.0.0",

View File

@ -226,21 +226,39 @@ Protocols: xmodem (128B blocks), xmodem1k, ymodem (batch), zmodem (streaming, re
@mcp.tool()
def list_serial_ports(usb_only: bool = True) -> list[dict]:
def list_serial_ports(usb_only: bool = True, grep: str | None = None) -> list[dict]:
"""List available serial ports on the system.
Args:
usb_only: If True (default), only show USB serial devices.
Set False to include legacy/phantom ttyS ports.
grep: Optional regex pattern to filter ports. Matches against device path,
description, hardware ID, manufacturer, product, and serial number.
Examples: "FTDI", "ttyUSB", "VID:PID=0403:6001", "CP210"
Returns a list of ports with their device path, description, and hardware ID.
Call this first to discover what ports are available before opening one.
"""
import re
# Compile grep pattern if provided
pattern = re.compile(grep, re.IGNORECASE) if grep else None
ports = []
for port in serial.tools.list_ports.comports():
# Filter out phantom/legacy ports if usb_only is True
if usb_only and port.hwid == "n/a":
continue
# Apply grep filter across all fields
if pattern:
searchable = " ".join(str(v) for v in [
port.device, port.description, port.hwid,
port.manufacturer, port.product, port.serial_number,
] if v)
if not pattern.search(searchable):
continue
ports.append({
"device": port.device,
"description": port.description,
@ -279,7 +297,12 @@ def open_serial_port(
sending data or responds to a probe string.
Args:
port: Device path (e.g., '/dev/ttyUSB0', 'COM3')
port: Device path (e.g., '/dev/ttyUSB0', 'COM3') or URL scheme:
- socket://host:port (raw TCP socket)
- rfc2217://host:port (Telnet COM Port Control)
- loop:// (loopback for testing)
- spy://device (debug wrapper, logs traffic)
- cp2110:// (Silicon Labs HID-to-UART, requires `hidapi`)
baudrate: Baud rate. If None, auto-detect (recommended for unknown devices)
bytesize: Data bits (5, 6, 7, or 8)
parity: Parity checking (N=None, E=Even, O=Odd, M=Mark, S=Space)
@ -303,10 +326,28 @@ def open_serial_port(
if len(_connections) >= MAX_CONNECTIONS:
return {"error": f"Maximum connections ({MAX_CONNECTIONS}) reached", "success": False}
# Auto-detect baud rate if not specified
# Detect URL scheme vs local device path
is_url = "://" in port
_SUPPORTED_SCHEMES = ("socket", "rfc2217", "loop", "spy", "cp2110", "alt", "hwgrep")
if is_url:
scheme = port.split("://", 1)[0].lower()
if scheme not in _SUPPORTED_SCHEMES:
return {"error": f"Unsupported URL scheme: {scheme}://", "success": False}
if scheme == "cp2110":
try:
import hid # noqa: F401
except ImportError:
return {
"error": "cp2110:// requires the hidapi package. "
"Install with: pip install mcserial[cp2110]",
"success": False,
}
# Auto-detect baud rate if not specified (only for local devices)
detected_info = None
if baudrate is None:
# Import the detect function's logic inline to avoid circular issues
if baudrate is None and not is_url:
detection_result = _detect_baud_rate_internal(
port=port,
probe=autobaud_probe,
@ -323,15 +364,30 @@ def open_serial_port(
],
}
else:
# Fall back to default if detection fails
baudrate = DEFAULT_BAUDRATE
detected_info = {
"auto_detected": False,
"fallback_reason": "No data received or low confidence",
"using_default": DEFAULT_BAUDRATE,
}
elif baudrate is None:
baudrate = DEFAULT_BAUDRATE
try:
if is_url:
conn = serial.serial_for_url(
port,
baudrate=baudrate,
bytesize=bytesize,
parity=parity,
stopbits=stopbits,
timeout=timeout,
write_timeout=write_timeout,
xonxoff=xonxoff,
rtscts=rtscts,
dsrdtr=dsrdtr,
)
else:
conn = serial.Serial(
port=port,
baudrate=baudrate,
@ -350,7 +406,7 @@ def open_serial_port(
result = {
"success": True,
"port": port,
"mode": "rs232", # Default mode
"mode": "rs232",
"baudrate": baudrate,
"bytesize": bytesize,
"parity": parity,
@ -358,15 +414,21 @@ def open_serial_port(
"xonxoff": xonxoff,
"rtscts": rtscts,
"dsrdtr": dsrdtr,
"exclusive": exclusive,
"resource_uri": f"serial://{port}/data",
"mode_hint": "Use set_port_mode() to switch to RS-485 mode if needed.",
}
if is_url:
result["url_scheme"] = port.split("://", 1)[0].lower()
result["hint"] = "Opened via URL handler. Some features (exclusive, auto-baud) are not available."
else:
result["exclusive"] = exclusive
result["mode_hint"] = "Use set_port_mode() to switch to RS-485 mode if needed."
if detected_info:
result["autobaud"] = detected_info
return result
except serial.SerialException as e:
return {"error": str(e), "success": False}
except OSError as e:
return {"error": f"Connection error: {e}", "success": False}
@mcp.tool()
@ -520,6 +582,55 @@ def read_serial_line(port: str, encoding: str = "utf-8") -> dict:
return {"error": str(e), "success": False}
@mcp.tool()
def read_serial_lines(
port: str,
max_lines: int = 10,
encoding: str = "utf-8",
) -> dict:
"""Read multiple lines from an open serial port.
Reads up to max_lines, stopping early if no more data is available
(readline returns empty due to timeout). Useful for draining buffered
output or reading multi-line responses.
Args:
port: Device path of the port to read from
max_lines: Maximum number of lines to read (default: 10)
encoding: Character encoding for decoding (default: utf-8)
Returns:
List of lines read from the port
"""
if port not in _connections:
return {"error": f"Port {port} is not open", "success": False}
if max_lines < 1 or max_lines > 1000:
return {"error": "max_lines must be 1-1000", "success": False}
try:
conn = _connections[port].connection
lines = []
total_bytes = 0
for _ in range(max_lines):
raw = conn.readline()
if not raw:
break
total_bytes += len(raw)
lines.append(raw.decode(encoding, errors="replace").rstrip("\r\n"))
return {
"success": True,
"lines": lines,
"count": len(lines),
"bytes_read": total_bytes,
"port": port,
}
except serial.SerialException as e:
return {"error": str(e), "success": False}
@mcp.tool()
def configure_serial(
port: str,
@ -657,6 +768,112 @@ def flush_serial(port: str, input_buffer: bool = True, output_buffer: bool = Tru
return {"error": str(e), "success": False}
@mcp.tool()
def cancel_read(port: str) -> dict:
"""Cancel any pending read operation on a serial port.
Interrupts a blocking read by calling cancel_read() on the underlying
pyserial connection. Only effective on platforms that support it
(POSIX with select-based reads).
Args:
port: Device path of the port
Returns:
Cancellation status
"""
if port not in _connections:
return {"error": f"Port {port} is not open", "success": False}
try:
conn = _connections[port].connection
conn.cancel_read()
return {"success": True, "port": port, "cancelled": "read"}
except (serial.SerialException, AttributeError) as e:
return {"error": str(e), "success": False}
@mcp.tool()
def cancel_write(port: str) -> dict:
"""Cancel any pending write operation on a serial port.
Interrupts a blocking write by calling cancel_write() on the underlying
pyserial connection. Only effective on platforms that support it
(POSIX with select-based writes).
Args:
port: Device path of the port
Returns:
Cancellation status
"""
if port not in _connections:
return {"error": f"Port {port} is not open", "success": False}
try:
conn = _connections[port].connection
conn.cancel_write()
return {"success": True, "port": port, "cancelled": "write"}
except (serial.SerialException, AttributeError) as e:
return {"error": str(e), "success": False}
@mcp.tool()
def set_flow_control(
port: str,
input_flow: bool | None = None,
output_flow: bool | None = None,
) -> dict:
"""Manually gate input/output flow control on a serial port.
When software (XON/XOFF) or hardware (RTS/CTS) flow control is enabled,
these calls manually send flow control signals:
- input_flow=True: Send XON (allow remote to send)
- input_flow=False: Send XOFF (tell remote to stop)
- output_flow=True: Resume our outgoing data
- output_flow=False: Pause our outgoing data
Flow control must already be enabled via configure_serial (xonxoff=True
or rtscts=True) for these to have effect.
WARNING: Not portable across all platforms.
Args:
port: Device path of the port
input_flow: Enable (XON) or disable (XOFF) incoming data (None = no change)
output_flow: Enable or disable outgoing data (None = no change)
Returns:
Flow control gate status
"""
if port not in _connections:
return {"error": f"Port {port} is not open", "success": False}
try:
conn = _connections[port].connection
changes = {}
if input_flow is not None:
conn.set_input_flow_control(input_flow)
changes["input_flow"] = input_flow
if output_flow is not None:
conn.set_output_flow_control(output_flow)
changes["output_flow"] = output_flow
return {
"success": True,
"port": port,
"changes": changes,
"flow_control_enabled": {
"xonxoff": conn.xonxoff,
"rtscts": conn.rtscts,
},
}
except (serial.SerialException, AttributeError) as e:
return {"error": str(e), "success": False}
@mcp.tool()
def set_port_mode(port: str, mode: Literal["rs232", "rs485"]) -> dict:
"""Switch a serial port between RS-232 and RS-485 modes.
@ -755,6 +972,7 @@ def get_modem_lines(port: str) -> dict:
"rts": conn.rts,
"dtr": conn.dtr,
},
"break_condition": conn.break_condition,
}
except serial.SerialException as e:
return {"error": str(e), "success": False}