From 5c655fe743aec1db4ef6d690bc3ceb3288e4f65a Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Sun, 1 Feb 2026 22:39:09 -0700 Subject: [PATCH] Add URL handlers, pyserial feature parity, and cp2110 extra - open_serial_port now accepts URL schemes (socket://, rfc2217://, loop://, spy://, cp2110://) via serial_for_url() - cp2110 HID-to-UART support gated behind optional [cp2110] extra - Add grep parameter to list_serial_ports for regex filtering - Add read_serial_lines for batch multi-line reads - Add cancel_read/cancel_write for pending I/O interruption - Add set_flow_control for manual XON/XOFF gating - Add break_condition to get_modem_lines response --- pyproject.toml | 3 + src/mcserial/server.py | 264 +++++++++++++++++++++++++++++++++++++---- 2 files changed, 244 insertions(+), 23 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 9b59ce4..01e0dff 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,6 +24,9 @@ dependencies = [ ] [project.optional-dependencies] +cp2110 = [ + "hidapi>=0.14.0", +] dev = [ "ruff>=0.9.0", "pytest>=8.0.0", diff --git a/src/mcserial/server.py b/src/mcserial/server.py index 1db3a12..98f6d36 100644 --- a/src/mcserial/server.py +++ b/src/mcserial/server.py @@ -226,21 +226,39 @@ Protocols: xmodem (128B blocks), xmodem1k, ymodem (batch), zmodem (streaming, re @mcp.tool() -def list_serial_ports(usb_only: bool = True) -> list[dict]: +def list_serial_ports(usb_only: bool = True, grep: str | None = None) -> list[dict]: """List available serial ports on the system. Args: usb_only: If True (default), only show USB serial devices. Set False to include legacy/phantom ttyS ports. + grep: Optional regex pattern to filter ports. Matches against device path, + description, hardware ID, manufacturer, product, and serial number. + Examples: "FTDI", "ttyUSB", "VID:PID=0403:6001", "CP210" Returns a list of ports with their device path, description, and hardware ID. Call this first to discover what ports are available before opening one. """ + import re + + # Compile grep pattern if provided + pattern = re.compile(grep, re.IGNORECASE) if grep else None + ports = [] for port in serial.tools.list_ports.comports(): # Filter out phantom/legacy ports if usb_only is True if usb_only and port.hwid == "n/a": continue + + # Apply grep filter across all fields + if pattern: + searchable = " ".join(str(v) for v in [ + port.device, port.description, port.hwid, + port.manufacturer, port.product, port.serial_number, + ] if v) + if not pattern.search(searchable): + continue + ports.append({ "device": port.device, "description": port.description, @@ -279,7 +297,12 @@ def open_serial_port( sending data or responds to a probe string. Args: - port: Device path (e.g., '/dev/ttyUSB0', 'COM3') + port: Device path (e.g., '/dev/ttyUSB0', 'COM3') or URL scheme: + - socket://host:port (raw TCP socket) + - rfc2217://host:port (Telnet COM Port Control) + - loop:// (loopback for testing) + - spy://device (debug wrapper, logs traffic) + - cp2110:// (Silicon Labs HID-to-UART, requires `hidapi`) baudrate: Baud rate. If None, auto-detect (recommended for unknown devices) bytesize: Data bits (5, 6, 7, or 8) parity: Parity checking (N=None, E=Even, O=Odd, M=Mark, S=Space) @@ -303,10 +326,28 @@ def open_serial_port( if len(_connections) >= MAX_CONNECTIONS: return {"error": f"Maximum connections ({MAX_CONNECTIONS}) reached", "success": False} - # Auto-detect baud rate if not specified + # Detect URL scheme vs local device path + is_url = "://" in port + _SUPPORTED_SCHEMES = ("socket", "rfc2217", "loop", "spy", "cp2110", "alt", "hwgrep") + + if is_url: + scheme = port.split("://", 1)[0].lower() + if scheme not in _SUPPORTED_SCHEMES: + return {"error": f"Unsupported URL scheme: {scheme}://", "success": False} + + if scheme == "cp2110": + try: + import hid # noqa: F401 + except ImportError: + return { + "error": "cp2110:// requires the hidapi package. " + "Install with: pip install mcserial[cp2110]", + "success": False, + } + + # Auto-detect baud rate if not specified (only for local devices) detected_info = None - if baudrate is None: - # Import the detect function's logic inline to avoid circular issues + if baudrate is None and not is_url: detection_result = _detect_baud_rate_internal( port=port, probe=autobaud_probe, @@ -323,34 +364,49 @@ def open_serial_port( ], } else: - # Fall back to default if detection fails baudrate = DEFAULT_BAUDRATE detected_info = { "auto_detected": False, "fallback_reason": "No data received or low confidence", "using_default": DEFAULT_BAUDRATE, } + elif baudrate is None: + baudrate = DEFAULT_BAUDRATE try: - conn = serial.Serial( - port=port, - baudrate=baudrate, - bytesize=bytesize, - parity=parity, - stopbits=stopbits, - timeout=timeout, - write_timeout=write_timeout, - inter_byte_timeout=inter_byte_timeout, - xonxoff=xonxoff, - rtscts=rtscts, - dsrdtr=dsrdtr, - exclusive=exclusive, - ) + if is_url: + conn = serial.serial_for_url( + port, + baudrate=baudrate, + bytesize=bytesize, + parity=parity, + stopbits=stopbits, + timeout=timeout, + write_timeout=write_timeout, + xonxoff=xonxoff, + rtscts=rtscts, + dsrdtr=dsrdtr, + ) + else: + conn = serial.Serial( + port=port, + baudrate=baudrate, + bytesize=bytesize, + parity=parity, + stopbits=stopbits, + timeout=timeout, + write_timeout=write_timeout, + inter_byte_timeout=inter_byte_timeout, + xonxoff=xonxoff, + rtscts=rtscts, + dsrdtr=dsrdtr, + exclusive=exclusive, + ) _connections[port] = SerialConnection(port=port, connection=conn) result = { "success": True, "port": port, - "mode": "rs232", # Default mode + "mode": "rs232", "baudrate": baudrate, "bytesize": bytesize, "parity": parity, @@ -358,15 +414,21 @@ def open_serial_port( "xonxoff": xonxoff, "rtscts": rtscts, "dsrdtr": dsrdtr, - "exclusive": exclusive, "resource_uri": f"serial://{port}/data", - "mode_hint": "Use set_port_mode() to switch to RS-485 mode if needed.", } + if is_url: + result["url_scheme"] = port.split("://", 1)[0].lower() + result["hint"] = "Opened via URL handler. Some features (exclusive, auto-baud) are not available." + else: + result["exclusive"] = exclusive + result["mode_hint"] = "Use set_port_mode() to switch to RS-485 mode if needed." if detected_info: result["autobaud"] = detected_info return result except serial.SerialException as e: return {"error": str(e), "success": False} + except OSError as e: + return {"error": f"Connection error: {e}", "success": False} @mcp.tool() @@ -520,6 +582,55 @@ def read_serial_line(port: str, encoding: str = "utf-8") -> dict: return {"error": str(e), "success": False} +@mcp.tool() +def read_serial_lines( + port: str, + max_lines: int = 10, + encoding: str = "utf-8", +) -> dict: + """Read multiple lines from an open serial port. + + Reads up to max_lines, stopping early if no more data is available + (readline returns empty due to timeout). Useful for draining buffered + output or reading multi-line responses. + + Args: + port: Device path of the port to read from + max_lines: Maximum number of lines to read (default: 10) + encoding: Character encoding for decoding (default: utf-8) + + Returns: + List of lines read from the port + """ + if port not in _connections: + return {"error": f"Port {port} is not open", "success": False} + + if max_lines < 1 or max_lines > 1000: + return {"error": "max_lines must be 1-1000", "success": False} + + try: + conn = _connections[port].connection + lines = [] + total_bytes = 0 + + for _ in range(max_lines): + raw = conn.readline() + if not raw: + break + total_bytes += len(raw) + lines.append(raw.decode(encoding, errors="replace").rstrip("\r\n")) + + return { + "success": True, + "lines": lines, + "count": len(lines), + "bytes_read": total_bytes, + "port": port, + } + except serial.SerialException as e: + return {"error": str(e), "success": False} + + @mcp.tool() def configure_serial( port: str, @@ -657,6 +768,112 @@ def flush_serial(port: str, input_buffer: bool = True, output_buffer: bool = Tru return {"error": str(e), "success": False} +@mcp.tool() +def cancel_read(port: str) -> dict: + """Cancel any pending read operation on a serial port. + + Interrupts a blocking read by calling cancel_read() on the underlying + pyserial connection. Only effective on platforms that support it + (POSIX with select-based reads). + + Args: + port: Device path of the port + + Returns: + Cancellation status + """ + if port not in _connections: + return {"error": f"Port {port} is not open", "success": False} + + try: + conn = _connections[port].connection + conn.cancel_read() + return {"success": True, "port": port, "cancelled": "read"} + except (serial.SerialException, AttributeError) as e: + return {"error": str(e), "success": False} + + +@mcp.tool() +def cancel_write(port: str) -> dict: + """Cancel any pending write operation on a serial port. + + Interrupts a blocking write by calling cancel_write() on the underlying + pyserial connection. Only effective on platforms that support it + (POSIX with select-based writes). + + Args: + port: Device path of the port + + Returns: + Cancellation status + """ + if port not in _connections: + return {"error": f"Port {port} is not open", "success": False} + + try: + conn = _connections[port].connection + conn.cancel_write() + return {"success": True, "port": port, "cancelled": "write"} + except (serial.SerialException, AttributeError) as e: + return {"error": str(e), "success": False} + + +@mcp.tool() +def set_flow_control( + port: str, + input_flow: bool | None = None, + output_flow: bool | None = None, +) -> dict: + """Manually gate input/output flow control on a serial port. + + When software (XON/XOFF) or hardware (RTS/CTS) flow control is enabled, + these calls manually send flow control signals: + + - input_flow=True: Send XON (allow remote to send) + - input_flow=False: Send XOFF (tell remote to stop) + - output_flow=True: Resume our outgoing data + - output_flow=False: Pause our outgoing data + + Flow control must already be enabled via configure_serial (xonxoff=True + or rtscts=True) for these to have effect. + + WARNING: Not portable across all platforms. + + Args: + port: Device path of the port + input_flow: Enable (XON) or disable (XOFF) incoming data (None = no change) + output_flow: Enable or disable outgoing data (None = no change) + + Returns: + Flow control gate status + """ + if port not in _connections: + return {"error": f"Port {port} is not open", "success": False} + + try: + conn = _connections[port].connection + changes = {} + + if input_flow is not None: + conn.set_input_flow_control(input_flow) + changes["input_flow"] = input_flow + if output_flow is not None: + conn.set_output_flow_control(output_flow) + changes["output_flow"] = output_flow + + return { + "success": True, + "port": port, + "changes": changes, + "flow_control_enabled": { + "xonxoff": conn.xonxoff, + "rtscts": conn.rtscts, + }, + } + except (serial.SerialException, AttributeError) as e: + return {"error": str(e), "success": False} + + @mcp.tool() def set_port_mode(port: str, mode: Literal["rs232", "rs485"]) -> dict: """Switch a serial port between RS-232 and RS-485 modes. @@ -755,6 +972,7 @@ def get_modem_lines(port: str) -> dict: "rts": conn.rts, "dtr": conn.dtr, }, + "break_condition": conn.break_condition, } except serial.SerialException as e: return {"error": str(e), "success": False}